Upgrading to reactor core 3.1.0

- replaced setCancellation -> onDispose on FluxSink
 - replaced Flux.intervalMillis -> Flux.interval(Duration) in tests

Moving reactor dependency to release train
This commit is contained in:
Vinicius Carvalho
2017-07-05 13:25:50 -04:00
committed by Soby Chacko
parent c22094f7cc
commit bc7f794112
4 changed files with 33 additions and 25 deletions

View File

@@ -24,7 +24,7 @@
<rxjava-reactive-streams.version>1.2.1</rxjava-reactive-streams.version>
<spring.tuple.version>1.0.0.RELEASE</spring.tuple.version>
<spring.integration.tuple.version>1.0.0.RELEASE</spring.integration.tuple.version>
<reactor.version>3.0.5.RELEASE</reactor.version>
<reactor.version>Bismuth-M2</reactor.version>
<kryo-shaded.version>3.0.3</kryo-shaded.version>
<objenesis.version>2.1</objenesis.version>
</properties>
@@ -87,8 +87,10 @@
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<artifactId>reactor-bom</artifactId>
<version>${reactor.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>

View File

@@ -33,6 +33,7 @@ import org.springframework.util.Assert;
* {@link MessageChannel} to a {@link Flux}.
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Vinicius Carvalho
*/
public class MessageChannelToInputFluxParameterAdapter
implements StreamListenerParameterAdapter<Flux<?>, SubscribableChannel> {
@@ -64,7 +65,7 @@ public class MessageChannelToInputFluxParameterAdapter
}
};
bindingTarget.subscribe(messageHandler);
emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler));
}).publish().autoConnect();
}
else {
@@ -81,7 +82,7 @@ public class MessageChannelToInputFluxParameterAdapter
}
};
bindingTarget.subscribe(messageHandler);
emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler));
}).publish().autoConnect();
}
}

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.reactive;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Artem Bilan
* @author Vinicius Carvalho
*/
public class StreamEmitterBasicTests {
@@ -191,7 +193,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
public void emit(@Output(Source.OUTPUT) FluxSender output) {
output.send(Flux.intervalMillis(1)
output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
@@ -204,7 +206,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
output.send(Flux.intervalMillis(1)
output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
@@ -218,11 +220,11 @@ public class StreamEmitterBasicTests {
public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) {
output1.send(Flux.intervalMillis(1)
output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
output2.send(Flux.intervalMillis(1)
output2.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
output3.send(Flux.intervalMillis(1)
output3.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -234,20 +236,20 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux<String> emit1() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux<String> emit2() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) {
outputX.send(Flux.intervalMillis(1)
outputX.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -271,7 +273,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux<String> emit1() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -281,7 +283,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux<String> emit2() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello FooBar!!" + l);
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.reactive;
import java.time.Duration;
import org.junit.Test;
import reactor.core.publisher.Flux;
@@ -41,6 +43,7 @@ import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessag
/**
* @author Soby Chacko
* @author Vinicius Carvalho
*/
public class StreamEmitterValidationTests {
@@ -191,7 +194,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output(Source.OUTPUT)
public void receive(@Output(Source.OUTPUT) FluxSender output) {
output.send(Flux.intervalMillis(1)
output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -202,7 +205,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public Flux<String> emit() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -213,7 +216,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public void emit(FluxSender output) {
output.send(Flux.intervalMillis(1)
output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -224,7 +227,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public Flux<String> emit(@Output(Source.OUTPUT) FluxSender output) {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -236,7 +239,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output(Source.OUTPUT)
public Flux<String> receive(FluxSender output) {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -249,11 +252,11 @@ public class StreamEmitterValidationTests {
public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
FluxSender output3) {
output1.send(Flux.intervalMillis(1)
output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
output2.send(Flux.intervalMillis(1)
output2.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
output3.send(Flux.intervalMillis(1)
output3.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -265,7 +268,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output("")
public void receive(FluxSender output) {
output.send(Flux.intervalMillis(1)
output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -276,7 +279,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public void emit(@Output("") FluxSender output1) {
output1.send(Flux.intervalMillis(1)
output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -289,7 +292,7 @@ public class StreamEmitterValidationTests {
@Output(Source.OUTPUT)
@Input(Processor.INPUT)
public Flux<String> emit() {
return Flux.intervalMillis(1)
return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}