From bc7f794112ebe24da64af024d2fc683c1e39f9e0 Mon Sep 17 00:00:00 2001 From: Vinicius Carvalho Date: Wed, 5 Jul 2017 13:25:50 -0400 Subject: [PATCH] Upgrading to reactor core 3.1.0 - replaced setCancellation -> onDispose on FluxSink - replaced Flux.intervalMillis -> Flux.interval(Duration) in tests Moving reactor dependency to release train --- pom.xml | 6 +++-- ...ageChannelToInputFluxParameterAdapter.java | 5 ++-- .../reactive/StreamEmitterBasicTests.java | 22 ++++++++-------- .../StreamEmitterValidationTests.java | 25 +++++++++++-------- 4 files changed, 33 insertions(+), 25 deletions(-) diff --git a/pom.xml b/pom.xml index 1746c22df..e6754e2a3 100644 --- a/pom.xml +++ b/pom.xml @@ -24,7 +24,7 @@ 1.2.1 1.0.0.RELEASE 1.0.0.RELEASE - 3.0.5.RELEASE + Bismuth-M2 3.0.3 2.1 @@ -87,8 +87,10 @@ io.projectreactor - reactor-core + reactor-bom ${reactor.version} + pom + import io.reactivex diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java index 5f1514ecc..87ab471e9 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java @@ -33,6 +33,7 @@ import org.springframework.util.Assert; * {@link MessageChannel} to a {@link Flux}. * @author Marius Bogoevici * @author Ilayaperumal Gopinathan + * @author Vinicius Carvalho */ public class MessageChannelToInputFluxParameterAdapter implements StreamListenerParameterAdapter, SubscribableChannel> { @@ -64,7 +65,7 @@ public class MessageChannelToInputFluxParameterAdapter } }; bindingTarget.subscribe(messageHandler); - emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler)); + emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler)); }).publish().autoConnect(); } else { @@ -81,7 +82,7 @@ public class MessageChannelToInputFluxParameterAdapter } }; bindingTarget.subscribe(messageHandler); - emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler)); + emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler)); }).publish().autoConnect(); } } diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java index a6c477c1a..14005480c 100644 --- a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.stream.reactive; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -45,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat; /** * @author Soby Chacko * @author Artem Bilan + * @author Vinicius Carvalho */ public class StreamEmitterBasicTests { @@ -191,7 +193,7 @@ public class StreamEmitterBasicTests { @StreamEmitter public void emit(@Output(Source.OUTPUT) FluxSender output) { - output.send(Flux.intervalMillis(1) + output.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l) .map(String::toUpperCase)); } @@ -204,7 +206,7 @@ public class StreamEmitterBasicTests { @StreamEmitter @Output(Source.OUTPUT) public void emit(FluxSender output) { - output.send(Flux.intervalMillis(1) + output.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l) .map(String::toUpperCase)); } @@ -218,11 +220,11 @@ public class StreamEmitterBasicTests { public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1, @Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2, @Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) { - output1.send(Flux.intervalMillis(1) + output1.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); - output2.send(Flux.intervalMillis(1) + output2.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); - output3.send(Flux.intervalMillis(1) + output3.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -234,20 +236,20 @@ public class StreamEmitterBasicTests { @StreamEmitter @Output(TestMultiOutboundChannels.OUTPUT1) public Flux emit1() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } @StreamEmitter @Output(TestMultiOutboundChannels.OUTPUT2) public Flux emit2() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } @StreamEmitter public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) { - outputX.send(Flux.intervalMillis(1) + outputX.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -271,7 +273,7 @@ public class StreamEmitterBasicTests { @StreamEmitter @Output(TestMultiOutboundChannels.OUTPUT1) public Flux emit1() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } } @@ -281,7 +283,7 @@ public class StreamEmitterBasicTests { @StreamEmitter @Output(TestMultiOutboundChannels.OUTPUT2) public Flux emit2() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello FooBar!!" + l); } } diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java index c5d1be2cf..0d50e6500 100644 --- a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java @@ -16,6 +16,8 @@ package org.springframework.cloud.stream.reactive; +import java.time.Duration; + import org.junit.Test; import reactor.core.publisher.Flux; @@ -41,6 +43,7 @@ import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessag /** * @author Soby Chacko + * @author Vinicius Carvalho */ public class StreamEmitterValidationTests { @@ -191,7 +194,7 @@ public class StreamEmitterValidationTests { @StreamEmitter @Output(Source.OUTPUT) public void receive(@Output(Source.OUTPUT) FluxSender output) { - output.send(Flux.intervalMillis(1) + output.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -202,7 +205,7 @@ public class StreamEmitterValidationTests { @StreamEmitter public Flux emit() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } } @@ -213,7 +216,7 @@ public class StreamEmitterValidationTests { @StreamEmitter public void emit(FluxSender output) { - output.send(Flux.intervalMillis(1) + output.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -224,7 +227,7 @@ public class StreamEmitterValidationTests { @StreamEmitter public Flux emit(@Output(Source.OUTPUT) FluxSender output) { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } } @@ -236,7 +239,7 @@ public class StreamEmitterValidationTests { @StreamEmitter @Output(Source.OUTPUT) public Flux receive(FluxSender output) { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } } @@ -249,11 +252,11 @@ public class StreamEmitterValidationTests { public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1, @Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2, FluxSender output3) { - output1.send(Flux.intervalMillis(1) + output1.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); - output2.send(Flux.intervalMillis(1) + output2.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); - output3.send(Flux.intervalMillis(1) + output3.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -265,7 +268,7 @@ public class StreamEmitterValidationTests { @StreamEmitter @Output("") public void receive(FluxSender output) { - output.send(Flux.intervalMillis(1) + output.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -276,7 +279,7 @@ public class StreamEmitterValidationTests { @StreamEmitter public void emit(@Output("") FluxSender output1) { - output1.send(Flux.intervalMillis(1) + output1.send(Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l)); } } @@ -289,7 +292,7 @@ public class StreamEmitterValidationTests { @Output(Source.OUTPUT) @Input(Processor.INPUT) public Flux emit() { - return Flux.intervalMillis(1) + return Flux.interval(Duration.ofMillis(1)) .map(l -> "Hello World!!" + l); } }