diff --git a/pom.xml b/pom.xml
index fe61cd377..1457177ab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,9 +21,10 @@
1.71.1.10
+ 1.2.11.0.0.RELEASE1.0.0.RELEASE
- 3.0.2.RELEASE
+ 3.0.4.RELEASE3.0.32.1
diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
index 61aa8d0bd..d61139d07 100644
--- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
+++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc
@@ -502,7 +502,12 @@ Reactive programming support requires Java 1.8.
[NOTE]
====
-Reactive programming support requires the use of Reactor 3.0.0 and higher. `spring-cloud-stream-reactive` will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the `io.projectreactor:reactor-core` to an earlier release, especially when using Maven. This is the case for projects generated via Spring Initializr with Spring Boot 1.4, which will override the Reactor version to `2.0.8.RELEASE`. In such cases you must ensure that the proper version of the artifact is released. This can be simply achieved by adding a direct dependency on `io.projectreactor:reactor-core` with a version of `3.0.0.RC1` or later on your project.
+As of Spring Cloud Stream 1.1.1 and later (starting with release train Brooklyn.SR2), reactive programming support requires the use of Reactor 3.0.4.RELEASE and higher.
+Earlier Reactor versions (including 3.0.1.RELEASE, 3.0.2.RELEASE and 3.0.3.RELEASE) are not supported.
+`spring-cloud-stream-reactive` will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the `io.projectreactor:reactor-core` to an earlier release, especially when using Maven.
+This is the case for projects generated via Spring Initializr with Spring Boot 1.x, which will override the Reactor version to `2.0.8.RELEASE`.
+In such cases you must ensure that the proper version of the artifact is released.
+This can be simply achieved by adding a direct dependency on `io.projectreactor:reactor-core` with a version of `3.0.4.RELEASE` or later to your project.
====
[NOTE]
diff --git a/spring-cloud-stream-reactive/pom.xml b/spring-cloud-stream-reactive/pom.xml
index d8799e003..93e4b39a0 100644
--- a/spring-cloud-stream-reactive/pom.xml
+++ b/spring-cloud-stream-reactive/pom.xml
@@ -27,6 +27,11 @@
rxjavatrue
+
+ io.reactivex
+ rxjava-reactive-streams
+ ${rxjava-reactive-streams.version}
+ org.springframework.bootspring-boot-starter-test
@@ -44,4 +49,39 @@
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.3
+
+
+ package
+
+ shade
+
+
+
+
+ io.reactivex:rxjava-reactive-streams
+
+
+
+
+ rx.RxReactiveStreams
+ org.springframework.cloud.stream.reactive.shaded.rx.RxReactiveStreams
+
+
+ rx.internal
+ org.springframework.cloud.stream.reactive.shaded.rx.internal
+
+
+
+
+
+
+
+
+
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java
index 4d60f5466..c0e829476 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java
@@ -16,8 +16,8 @@
package org.springframework.cloud.stream.reactive;
-import reactor.adapter.RxJava1Adapter;
import rx.Observable;
+import rx.RxReactiveStreams;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
@@ -48,7 +48,7 @@ public class MessageChannelToInputObservableParameterAdapter
@Override
public Observable> adapt(final SubscribableChannel bindingTarget, MethodParameter parameter) {
- return RxJava1Adapter.publisherToObservable(
+ return RxReactiveStreams.toObservable(
this.messageChannelToInputFluxArgumentAdapter.adapt(bindingTarget, parameter));
}
}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java
index 48aa57bc1..ee0ee0ff7 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java
@@ -16,8 +16,10 @@
package org.springframework.cloud.stream.reactive;
-import reactor.adapter.RxJava1Adapter;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
import rx.Observable;
+import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
@@ -59,8 +61,9 @@ public class MessageChannelToObservableSenderParameterAdapter implements
@Override
public Single send(Observable> observable) {
- return RxJava1Adapter.publisherToSingle(
- this.fluxSender.send(RxJava1Adapter.observableToFlux(observable)));
+ Publisher> adaptedPublisher = RxReactiveStreams.toPublisher(observable);
+ return RxReactiveStreams.toSingle(
+ this.fluxSender.send(Flux.from(adaptedPublisher)));
}
};
}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java
index d7139e650..42c459a06 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java
@@ -16,8 +16,9 @@
package org.springframework.cloud.stream.reactive;
-import reactor.adapter.RxJava1Adapter;
+import reactor.core.publisher.Flux;
import rx.Observable;
+import rx.RxReactiveStreams;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.messaging.MessageChannel;
@@ -46,7 +47,7 @@ public class ObservableToMessageChannelResultAdapter
}
public void adapt(Observable> streamListenerResult, MessageChannel bindingTarget) {
- this.fluxToMessageChannelResultAdapter.adapt(RxJava1Adapter.observableToFlux(streamListenerResult),
+ this.fluxToMessageChannelResultAdapter.adapt(Flux.from(RxReactiveStreams.toPublisher(streamListenerResult)),
bindingTarget);
}
}
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
index f71deacb4..22194595c 100644
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
+++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java
@@ -16,20 +16,15 @@
package org.springframework.cloud.stream.reactive;
-import java.util.concurrent.ExecutorService;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import reactor.core.scheduler.Schedulers;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
-import org.springframework.cloud.stream.reactive.reactor.core.scheduler.NoInterruptOnCancelSchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.util.ClassUtils;
/**
* @author Marius Bogoevici
@@ -40,31 +35,6 @@ public class ReactiveSupportAutoConfiguration {
private static Log log = LogFactory.getLog(ReactiveSupportAutoConfiguration.class);
- static {
- try {
- // Override Schedulers.Factory for Reactor 3.0.0 to work around
- // https://github.com/reactor/reactor-core/issues/159
- // To be removed once Reactor 3.0.1+ is used
- Class> executorServiceSchedulerClass = ClassUtils.forName("reactor.core.scheduler.ExecutorServiceScheduler", null);
- try {
- // simple check that the construction version with the cancellation option is not on the classpath
- executorServiceSchedulerClass.getConstructor(ExecutorService.class, Boolean.class);
- }
- catch (NoSuchMethodException e) {
- if (log.isDebugEnabled()) {
- log.debug("Overriding Schedulers for Reactor");
- }
- Schedulers.setFactory(new NoInterruptOnCancelSchedulerFactory());
- }
- }
- catch (ClassNotFoundException e) {
- // Ignore if absent - means that we're on a different Reactor version than expected
- if (log.isInfoEnabled()) {
- log.info("Class reactor.core.scheduler.ExecutorServiceScheduler not found. Check Reactor version.");
- }
- }
- }
-
@Bean
public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter(
CompositeMessageConverterFactory compositeMessageConverterFactory) {
diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java
deleted file mode 100644
index 2403dc364..000000000
--- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java
+++ /dev/null
@@ -1,349 +0,0 @@
-/*
- * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import reactor.core.Cancellation;
-import reactor.core.Exceptions;
-import reactor.core.publisher.Operators;
-import reactor.core.scheduler.Scheduler;
-import reactor.util.concurrent.OpenHashSet;
-
-/**
- * Dynamically creates ExecutorService-based Workers and caches the thread pools, reusing
- * them once the Workers have been shut down.
- *
- * The maximum number of created thread pools is unbounded.
- *
- * The default time-to-live for unused thread pools is 60 seconds, use the
- * appropriate constructor to set a different value.
- *
- * This scheduler is not restartable (may be later).
- *
- * @author Stephane Maldini
- */
-final class ElasticScheduler implements Scheduler {
- static final AtomicLong COUNTER = new AtomicLong();
-
- static final ThreadFactory EVICTOR_FACTORY = r -> {
- Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet());
- t.setDaemon(true);
- return t;
- };
-
- final ThreadFactory factory;
-
- final int ttlSeconds;
-
- static final int DEFAULT_TTL_SECONDS = 60;
-
- final Queue cache;
-
- final Queue all;
-
- final ScheduledExecutorService evictor;
-
- static final ExecutorService SHUTDOWN;
-
- static {
- SHUTDOWN = Executors.newSingleThreadExecutor();
- SHUTDOWN.shutdownNow();
- }
-
- volatile boolean shutdown;
-
- public ElasticScheduler(ThreadFactory factory, int ttlSeconds) {
- this.ttlSeconds = ttlSeconds;
- this.factory = factory;
- this.cache = new ConcurrentLinkedQueue<>();
- this.all = new ConcurrentLinkedQueue<>();
- this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
- this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS);
- }
-
- @Override
- public void start() {
- throw new UnsupportedOperationException("Restarting not supported yet");
- }
-
- @Override
- public void shutdown() {
- if (shutdown) {
- return;
- }
- shutdown = true;
-
- evictor.shutdownNow();
-
- cache.clear();
-
- ExecutorService exec;
-
- while ((exec = all.poll()) != null) {
- exec.shutdownNow();
- }
- }
-
- ExecutorService pick() {
- if (shutdown) {
- return SHUTDOWN;
- }
- ExecutorService result;
- ExecutorServiceExpiry e = cache.poll();
- if (e != null) {
- return e.executor;
- }
-
- result = Executors.newSingleThreadExecutor(factory);
- all.offer(result);
- if (shutdown) {
- all.remove(result);
- return SHUTDOWN;
- }
- return result;
- }
-
- @Override
- public Cancellation schedule(Runnable task) {
- ExecutorService exec = pick();
-
- Runnable wrapper = () -> {
- try {
- try {
- task.run();
- } catch (Throwable ex) {
- Exceptions.throwIfFatal(ex);
- Operators.onErrorDropped(ex);
- }
- } finally {
- release(exec);
- }
- };
- Future> f;
-
- try {
- f = exec.submit(wrapper);
- } catch (RejectedExecutionException ex) {
- Operators.onErrorDropped(ex);
- return REJECTED;
- }
- return () -> f.cancel(true);
- }
-
- @Override
- public Worker createWorker() {
- ExecutorService exec = pick();
- return new CachedWorker(exec, this);
- }
-
- void release(ExecutorService exec) {
- if (exec != SHUTDOWN && !shutdown) {
- ExecutorServiceExpiry e = new ExecutorServiceExpiry(exec, System.currentTimeMillis() + ttlSeconds * 1000L);
- cache.offer(e);
- if (shutdown) {
- if (cache.remove(e)) {
- exec.shutdownNow();
- }
- }
- }
- }
-
- void eviction() {
- long now = System.currentTimeMillis();
-
- List list = new ArrayList<>(cache);
- for (ExecutorServiceExpiry e : list) {
- if (e.expireMillis < now) {
- if (cache.remove(e)) {
- e.executor.shutdownNow();
- }
- }
- }
- }
-
- static final class ExecutorServiceExpiry {
- final ExecutorService executor;
- final long expireMillis;
-
- public ExecutorServiceExpiry(ExecutorService executor, long expireMillis) {
- this.executor = executor;
- this.expireMillis = expireMillis;
- }
- }
-
- static final class CachedWorker implements Worker {
-
- final ExecutorService executor;
-
- final ElasticScheduler parent;
-
- volatile boolean shutdown;
-
- OpenHashSet tasks;
-
- public CachedWorker(ExecutorService executor, ElasticScheduler parent) {
- this.executor = executor;
- this.parent = parent;
- this.tasks = new OpenHashSet<>();
- }
-
- @Override
- public Cancellation schedule(Runnable task) {
- if (shutdown) {
- return REJECTED;
- }
-
- CachedTask ct = new CachedTask(task, this);
-
- synchronized (this) {
- if (shutdown) {
- return REJECTED;
- }
- tasks.add(ct);
- }
-
- Future> f;
- try {
- f = executor.submit(ct);
- } catch (RejectedExecutionException ex) {
- Operators.onErrorDropped(ex);
- return REJECTED;
- }
-
- ct.setFuture(f);
-
- return ct;
- }
-
- @Override
- public void shutdown() {
- if (shutdown) {
- return;
- }
-
- OpenHashSet set;
- synchronized (this) {
- if (shutdown) {
- return;
- }
- shutdown = true;
- set = tasks;
- tasks = null;
- }
-
- if (!set.isEmpty()) {
- Object[] keys = set.keys();
- for (Object o : keys) {
- if (o != null) {
- ((CachedTask) o).cancelFuture();
- }
- }
- }
-
- parent.release(executor);
- }
-
- void remove(CachedTask task) {
- if (shutdown) {
- return;
- }
-
- synchronized (this) {
- if (shutdown) {
- return;
- }
- tasks.remove(task);
- }
- }
-
- static final class CachedTask
- extends AtomicReference>
- implements Runnable, Cancellation {
- /** */
- private static final long serialVersionUID = 6799295393954430738L;
-
- final Runnable run;
-
- final CachedWorker parent;
-
- volatile boolean cancelled;
-
- static final FutureTask