diff --git a/pom.xml b/pom.xml
index 1746c22df..e6754e2a3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -24,7 +24,7 @@
1.2.1
1.0.0.RELEASE
1.0.0.RELEASE
- 3.0.5.RELEASE
+ Bismuth-M2
3.0.3
2.1
@@ -87,8 +87,10 @@
io.projectreactor
- reactor-core
+ reactor-bom
${reactor.version}
+ pom
+ import
io.reactivex
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java
index 5f1514ecc..87ab471e9 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputFluxParameterAdapter.java
@@ -33,6 +33,7 @@ import org.springframework.util.Assert;
* {@link MessageChannel} to a {@link Flux}.
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
+ * @author Vinicius Carvalho
*/
public class MessageChannelToInputFluxParameterAdapter
implements StreamListenerParameterAdapter, SubscribableChannel> {
@@ -64,7 +65,7 @@ public class MessageChannelToInputFluxParameterAdapter
}
};
bindingTarget.subscribe(messageHandler);
- emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
+ emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler));
}).publish().autoConnect();
}
else {
@@ -81,7 +82,7 @@ public class MessageChannelToInputFluxParameterAdapter
}
};
bindingTarget.subscribe(messageHandler);
- emitter.setCancellation(() -> bindingTarget.unsubscribe(messageHandler));
+ emitter.onDispose(() -> bindingTarget.unsubscribe(messageHandler));
}).publish().autoConnect();
}
}
diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java
index a6c477c1a..14005480c 100644
--- a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java
+++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterBasicTests.java
@@ -17,6 +17,7 @@
package org.springframework.cloud.stream.reactive;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -45,6 +46,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
* @author Artem Bilan
+ * @author Vinicius Carvalho
*/
public class StreamEmitterBasicTests {
@@ -191,7 +193,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
public void emit(@Output(Source.OUTPUT) FluxSender output) {
- output.send(Flux.intervalMillis(1)
+ output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
@@ -204,7 +206,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(Source.OUTPUT)
public void emit(FluxSender output) {
- output.send(Flux.intervalMillis(1)
+ output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l)
.map(String::toUpperCase));
}
@@ -218,11 +220,11 @@ public class StreamEmitterBasicTests {
public void emit(@Output(TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender output3) {
- output1.send(Flux.intervalMillis(1)
+ output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
- output2.send(Flux.intervalMillis(1)
+ output2.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
- output3.send(Flux.intervalMillis(1)
+ output3.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -234,20 +236,20 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux emit1() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux emit2() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
@StreamEmitter
public void emit3(@Output(TestMultiOutboundChannels.OUTPUT3) FluxSender outputX) {
- outputX.send(Flux.intervalMillis(1)
+ outputX.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -271,7 +273,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT1)
public Flux emit1() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -281,7 +283,7 @@ public class StreamEmitterBasicTests {
@StreamEmitter
@Output(TestMultiOutboundChannels.OUTPUT2)
public Flux emit2() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello FooBar!!" + l);
}
}
diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java
index c5d1be2cf..0d50e6500 100644
--- a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java
+++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamEmitterValidationTests.java
@@ -16,6 +16,8 @@
package org.springframework.cloud.stream.reactive;
+import java.time.Duration;
+
import org.junit.Test;
import reactor.core.publisher.Flux;
@@ -41,6 +43,7 @@ import static org.springframework.cloud.stream.reactive.StreamEmitterErrorMessag
/**
* @author Soby Chacko
+ * @author Vinicius Carvalho
*/
public class StreamEmitterValidationTests {
@@ -191,7 +194,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output(Source.OUTPUT)
public void receive(@Output(Source.OUTPUT) FluxSender output) {
- output.send(Flux.intervalMillis(1)
+ output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -202,7 +205,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public Flux emit() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -213,7 +216,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public void emit(FluxSender output) {
- output.send(Flux.intervalMillis(1)
+ output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -224,7 +227,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public Flux emit(@Output(Source.OUTPUT) FluxSender output) {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -236,7 +239,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output(Source.OUTPUT)
public Flux receive(FluxSender output) {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}
@@ -249,11 +252,11 @@ public class StreamEmitterValidationTests {
public void emit(@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT1) FluxSender output1,
@Output(StreamEmitterBasicTests.TestMultiOutboundChannels.OUTPUT2) FluxSender output2,
FluxSender output3) {
- output1.send(Flux.intervalMillis(1)
+ output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
- output2.send(Flux.intervalMillis(1)
+ output2.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
- output3.send(Flux.intervalMillis(1)
+ output3.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -265,7 +268,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
@Output("")
public void receive(FluxSender output) {
- output.send(Flux.intervalMillis(1)
+ output.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -276,7 +279,7 @@ public class StreamEmitterValidationTests {
@StreamEmitter
public void emit(@Output("") FluxSender output1) {
- output1.send(Flux.intervalMillis(1)
+ output1.send(Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l));
}
}
@@ -289,7 +292,7 @@ public class StreamEmitterValidationTests {
@Output(Source.OUTPUT)
@Input(Processor.INPUT)
public Flux emit() {
- return Flux.intervalMillis(1)
+ return Flux.interval(Duration.ofMillis(1))
.map(l -> "Hello World!!" + l);
}
}