From 2abac42ebafa04f4d55e31c9911c4706ead3f170 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Thu, 5 Jan 2017 15:52:09 -0500 Subject: [PATCH] Update to Reactor 3.0.4 Fixes #751 - Use Reactor 3.0.4.RELEASE - Remove Scheduler implementations that were introduced for fixes - Shade `io.reactivex:rxjava-reactive-streams` classes to replace removed RxJava1 adapter support Signed-off-by: Marius Bogoevici --- pom.xml | 3 +- .../spring-cloud-stream-overview.adoc | 7 +- spring-cloud-stream-reactive/pom.xml | 40 ++ ...nnelToInputObservableParameterAdapter.java | 4 +- ...nelToObservableSenderParameterAdapter.java | 9 +- ...servableToMessageChannelResultAdapter.java | 5 +- .../ReactiveSupportAutoConfiguration.java | 30 -- .../core/scheduler/ElasticScheduler.java | 349 ------------- .../NoInterruptOnCancelSchedulerFactory.java | 55 --- .../core/scheduler/ParallelScheduler.java | 325 ------------ .../core/scheduler/SingleScheduler.java | 295 ----------- .../core/scheduler/SingleTimedScheduler.java | 465 ------------------ 12 files changed, 59 insertions(+), 1528 deletions(-) delete mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java delete mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java delete mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java delete mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java delete mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java diff --git a/pom.xml b/pom.xml index fe61cd377..1457177ab 100644 --- a/pom.xml +++ b/pom.xml @@ -21,9 +21,10 @@ 1.7 1.1.10 + 1.2.1 1.0.0.RELEASE 1.0.0.RELEASE - 3.0.2.RELEASE + 3.0.4.RELEASE 3.0.3 2.1 diff --git a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc index 61aa8d0bd..d61139d07 100644 --- a/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc +++ b/spring-cloud-stream-core-docs/src/main/asciidoc/spring-cloud-stream-overview.adoc @@ -502,7 +502,12 @@ Reactive programming support requires Java 1.8. [NOTE] ==== -Reactive programming support requires the use of Reactor 3.0.0 and higher. `spring-cloud-stream-reactive` will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the `io.projectreactor:reactor-core` to an earlier release, especially when using Maven. This is the case for projects generated via Spring Initializr with Spring Boot 1.4, which will override the Reactor version to `2.0.8.RELEASE`. In such cases you must ensure that the proper version of the artifact is released. This can be simply achieved by adding a direct dependency on `io.projectreactor:reactor-core` with a version of `3.0.0.RC1` or later on your project. +As of Spring Cloud Stream 1.1.1 and later (starting with release train Brooklyn.SR2), reactive programming support requires the use of Reactor 3.0.4.RELEASE and higher. +Earlier Reactor versions (including 3.0.1.RELEASE, 3.0.2.RELEASE and 3.0.3.RELEASE) are not supported. +`spring-cloud-stream-reactive` will transitively retrieve the proper version, but it is possible for the project structure to manage the version of the `io.projectreactor:reactor-core` to an earlier release, especially when using Maven. +This is the case for projects generated via Spring Initializr with Spring Boot 1.x, which will override the Reactor version to `2.0.8.RELEASE`. +In such cases you must ensure that the proper version of the artifact is released. +This can be simply achieved by adding a direct dependency on `io.projectreactor:reactor-core` with a version of `3.0.4.RELEASE` or later to your project. ==== [NOTE] diff --git a/spring-cloud-stream-reactive/pom.xml b/spring-cloud-stream-reactive/pom.xml index d8799e003..93e4b39a0 100644 --- a/spring-cloud-stream-reactive/pom.xml +++ b/spring-cloud-stream-reactive/pom.xml @@ -27,6 +27,11 @@ rxjava true + + io.reactivex + rxjava-reactive-streams + ${rxjava-reactive-streams.version} + org.springframework.boot spring-boot-starter-test @@ -44,4 +49,39 @@ + + + + org.apache.maven.plugins + maven-shade-plugin + 2.4.3 + + + package + + shade + + + + + io.reactivex:rxjava-reactive-streams + + + + + rx.RxReactiveStreams + org.springframework.cloud.stream.reactive.shaded.rx.RxReactiveStreams + + + rx.internal + org.springframework.cloud.stream.reactive.shaded.rx.internal + + + + + + + + + diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java index 4d60f5466..c0e829476 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToInputObservableParameterAdapter.java @@ -16,8 +16,8 @@ package org.springframework.cloud.stream.reactive; -import reactor.adapter.RxJava1Adapter; import rx.Observable; +import rx.RxReactiveStreams; import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; import org.springframework.core.MethodParameter; @@ -48,7 +48,7 @@ public class MessageChannelToInputObservableParameterAdapter @Override public Observable adapt(final SubscribableChannel bindingTarget, MethodParameter parameter) { - return RxJava1Adapter.publisherToObservable( + return RxReactiveStreams.toObservable( this.messageChannelToInputFluxArgumentAdapter.adapt(bindingTarget, parameter)); } } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java index 48aa57bc1..ee0ee0ff7 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/MessageChannelToObservableSenderParameterAdapter.java @@ -16,8 +16,10 @@ package org.springframework.cloud.stream.reactive; -import reactor.adapter.RxJava1Adapter; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import rx.Observable; +import rx.RxReactiveStreams; import rx.Single; import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter; @@ -59,8 +61,9 @@ public class MessageChannelToObservableSenderParameterAdapter implements @Override public Single send(Observable observable) { - return RxJava1Adapter.publisherToSingle( - this.fluxSender.send(RxJava1Adapter.observableToFlux(observable))); + Publisher adaptedPublisher = RxReactiveStreams.toPublisher(observable); + return RxReactiveStreams.toSingle( + this.fluxSender.send(Flux.from(adaptedPublisher))); } }; } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java index d7139e650..42c459a06 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ObservableToMessageChannelResultAdapter.java @@ -16,8 +16,9 @@ package org.springframework.cloud.stream.reactive; -import reactor.adapter.RxJava1Adapter; +import reactor.core.publisher.Flux; import rx.Observable; +import rx.RxReactiveStreams; import org.springframework.cloud.stream.binding.StreamListenerResultAdapter; import org.springframework.messaging.MessageChannel; @@ -46,7 +47,7 @@ public class ObservableToMessageChannelResultAdapter } public void adapt(Observable streamListenerResult, MessageChannel bindingTarget) { - this.fluxToMessageChannelResultAdapter.adapt(RxJava1Adapter.observableToFlux(streamListenerResult), + this.fluxToMessageChannelResultAdapter.adapt(Flux.from(RxReactiveStreams.toPublisher(streamListenerResult)), bindingTarget); } } diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java index f71deacb4..22194595c 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java @@ -16,20 +16,15 @@ package org.springframework.cloud.stream.reactive; -import java.util.concurrent.ExecutorService; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import reactor.core.scheduler.Schedulers; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.binding.BindingService; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; -import org.springframework.cloud.stream.reactive.reactor.core.scheduler.NoInterruptOnCancelSchedulerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.util.ClassUtils; /** * @author Marius Bogoevici @@ -40,31 +35,6 @@ public class ReactiveSupportAutoConfiguration { private static Log log = LogFactory.getLog(ReactiveSupportAutoConfiguration.class); - static { - try { - // Override Schedulers.Factory for Reactor 3.0.0 to work around - // https://github.com/reactor/reactor-core/issues/159 - // To be removed once Reactor 3.0.1+ is used - Class executorServiceSchedulerClass = ClassUtils.forName("reactor.core.scheduler.ExecutorServiceScheduler", null); - try { - // simple check that the construction version with the cancellation option is not on the classpath - executorServiceSchedulerClass.getConstructor(ExecutorService.class, Boolean.class); - } - catch (NoSuchMethodException e) { - if (log.isDebugEnabled()) { - log.debug("Overriding Schedulers for Reactor"); - } - Schedulers.setFactory(new NoInterruptOnCancelSchedulerFactory()); - } - } - catch (ClassNotFoundException e) { - // Ignore if absent - means that we're on a different Reactor version than expected - if (log.isInfoEnabled()) { - log.info("Class reactor.core.scheduler.ExecutorServiceScheduler not found. Check Reactor version."); - } - } - } - @Bean public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter( CompositeMessageConverterFactory compositeMessageConverterFactory) { diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java deleted file mode 100644 index 2403dc364..000000000 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java +++ /dev/null @@ -1,349 +0,0 @@ -/* - * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.cloud.stream.reactive.reactor.core.scheduler; - -import java.util.ArrayList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import reactor.core.Cancellation; -import reactor.core.Exceptions; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.util.concurrent.OpenHashSet; - -/** - * Dynamically creates ExecutorService-based Workers and caches the thread pools, reusing - * them once the Workers have been shut down. - *

- * The maximum number of created thread pools is unbounded. - *

- * The default time-to-live for unused thread pools is 60 seconds, use the - * appropriate constructor to set a different value. - *

- * This scheduler is not restartable (may be later). - * - * @author Stephane Maldini - */ -final class ElasticScheduler implements Scheduler { - static final AtomicLong COUNTER = new AtomicLong(); - - static final ThreadFactory EVICTOR_FACTORY = r -> { - Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet()); - t.setDaemon(true); - return t; - }; - - final ThreadFactory factory; - - final int ttlSeconds; - - static final int DEFAULT_TTL_SECONDS = 60; - - final Queue cache; - - final Queue all; - - final ScheduledExecutorService evictor; - - static final ExecutorService SHUTDOWN; - - static { - SHUTDOWN = Executors.newSingleThreadExecutor(); - SHUTDOWN.shutdownNow(); - } - - volatile boolean shutdown; - - public ElasticScheduler(ThreadFactory factory, int ttlSeconds) { - this.ttlSeconds = ttlSeconds; - this.factory = factory; - this.cache = new ConcurrentLinkedQueue<>(); - this.all = new ConcurrentLinkedQueue<>(); - this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY); - this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS); - } - - @Override - public void start() { - throw new UnsupportedOperationException("Restarting not supported yet"); - } - - @Override - public void shutdown() { - if (shutdown) { - return; - } - shutdown = true; - - evictor.shutdownNow(); - - cache.clear(); - - ExecutorService exec; - - while ((exec = all.poll()) != null) { - exec.shutdownNow(); - } - } - - ExecutorService pick() { - if (shutdown) { - return SHUTDOWN; - } - ExecutorService result; - ExecutorServiceExpiry e = cache.poll(); - if (e != null) { - return e.executor; - } - - result = Executors.newSingleThreadExecutor(factory); - all.offer(result); - if (shutdown) { - all.remove(result); - return SHUTDOWN; - } - return result; - } - - @Override - public Cancellation schedule(Runnable task) { - ExecutorService exec = pick(); - - Runnable wrapper = () -> { - try { - try { - task.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - Operators.onErrorDropped(ex); - } - } finally { - release(exec); - } - }; - Future f; - - try { - f = exec.submit(wrapper); - } catch (RejectedExecutionException ex) { - Operators.onErrorDropped(ex); - return REJECTED; - } - return () -> f.cancel(true); - } - - @Override - public Worker createWorker() { - ExecutorService exec = pick(); - return new CachedWorker(exec, this); - } - - void release(ExecutorService exec) { - if (exec != SHUTDOWN && !shutdown) { - ExecutorServiceExpiry e = new ExecutorServiceExpiry(exec, System.currentTimeMillis() + ttlSeconds * 1000L); - cache.offer(e); - if (shutdown) { - if (cache.remove(e)) { - exec.shutdownNow(); - } - } - } - } - - void eviction() { - long now = System.currentTimeMillis(); - - List list = new ArrayList<>(cache); - for (ExecutorServiceExpiry e : list) { - if (e.expireMillis < now) { - if (cache.remove(e)) { - e.executor.shutdownNow(); - } - } - } - } - - static final class ExecutorServiceExpiry { - final ExecutorService executor; - final long expireMillis; - - public ExecutorServiceExpiry(ExecutorService executor, long expireMillis) { - this.executor = executor; - this.expireMillis = expireMillis; - } - } - - static final class CachedWorker implements Worker { - - final ExecutorService executor; - - final ElasticScheduler parent; - - volatile boolean shutdown; - - OpenHashSet tasks; - - public CachedWorker(ExecutorService executor, ElasticScheduler parent) { - this.executor = executor; - this.parent = parent; - this.tasks = new OpenHashSet<>(); - } - - @Override - public Cancellation schedule(Runnable task) { - if (shutdown) { - return REJECTED; - } - - CachedTask ct = new CachedTask(task, this); - - synchronized (this) { - if (shutdown) { - return REJECTED; - } - tasks.add(ct); - } - - Future f; - try { - f = executor.submit(ct); - } catch (RejectedExecutionException ex) { - Operators.onErrorDropped(ex); - return REJECTED; - } - - ct.setFuture(f); - - return ct; - } - - @Override - public void shutdown() { - if (shutdown) { - return; - } - - OpenHashSet set; - synchronized (this) { - if (shutdown) { - return; - } - shutdown = true; - set = tasks; - tasks = null; - } - - if (!set.isEmpty()) { - Object[] keys = set.keys(); - for (Object o : keys) { - if (o != null) { - ((CachedTask) o).cancelFuture(); - } - } - } - - parent.release(executor); - } - - void remove(CachedTask task) { - if (shutdown) { - return; - } - - synchronized (this) { - if (shutdown) { - return; - } - tasks.remove(task); - } - } - - static final class CachedTask - extends AtomicReference> - implements Runnable, Cancellation { - /** */ - private static final long serialVersionUID = 6799295393954430738L; - - final Runnable run; - - final CachedWorker parent; - - volatile boolean cancelled; - - static final FutureTask CANCELLED = new FutureTask<>(() -> { - }, null); - - static final FutureTask FINISHED = new FutureTask<>(() -> { - }, null); - - public CachedTask(Runnable run, CachedWorker parent) { - this.run = run; - this.parent = parent; - } - - @Override - public void run() { - try { - if (!parent.shutdown && !cancelled) { - run.run(); - } - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - Operators.onErrorDropped(ex); - } finally { - lazySet(FINISHED); - parent.remove(this); - } - } - - @Override - public void dispose() { - cancelled = true; - cancelFuture(); - } - - void setFuture(Future f) { - if (!compareAndSet(null, f)) { - if (get() != FINISHED) { - f.cancel(true); - } - } - } - - void cancelFuture() { - Future f = get(); - if (f != CANCELLED && f != FINISHED) { - f = getAndSet(CANCELLED); - if (f != null && f != CANCELLED && f != FINISHED) { - f.cancel(true); - } - } - } - } - } -} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java deleted file mode 100644 index 9613ea3d5..000000000 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2016 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.stream.reactive.reactor.core.scheduler; - -import java.util.concurrent.ThreadFactory; - -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; -import reactor.core.scheduler.TimedScheduler; - -/** - * {@link reactor.core.scheduler.Schedulers.Factory} implementation that pulls in - * {@link reactor.core.scheduler.Scheduler} implementations with fixes for - * . - * - * This is a temporary solution, until a Reactor release is available. - * - * @author Marius Bogoevici - */ -public class NoInterruptOnCancelSchedulerFactory implements Schedulers.Factory { - - @Override - public Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { - return new ElasticScheduler(threadFactory, ttlSeconds); - } - - @Override - public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { - return new ParallelScheduler(parallelism, threadFactory); - } - - @Override - public Scheduler newSingle(ThreadFactory threadFactory) { - return new SingleScheduler(threadFactory); - } - - @Override - public TimedScheduler newTimer(ThreadFactory threadFactory) { - return new SingleTimedScheduler(threadFactory); - } -} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java deleted file mode 100644 index ad72beb21..000000000 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java +++ /dev/null @@ -1,325 +0,0 @@ -/* - * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.cloud.stream.reactive.reactor.core.scheduler; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import reactor.core.Cancellation; -import reactor.core.Exceptions; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.util.concurrent.OpenHashSet; - -/** - * Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers - * and is suited for parallel work. - * - * @author Stephane Maldini - */ -final class ParallelScheduler implements Scheduler { - - static final AtomicLong COUNTER = new AtomicLong(); - - final int n; - - final ThreadFactory factory; - - volatile ExecutorService[] executors; - static final AtomicReferenceFieldUpdater EXECUTORS = - AtomicReferenceFieldUpdater.newUpdater(ParallelScheduler.class, ExecutorService[].class, "executors"); - - static final ExecutorService[] SHUTDOWN = new ExecutorService[0]; - - static final ExecutorService TERMINATED; - - static { - TERMINATED = Executors.newSingleThreadExecutor(); - TERMINATED.shutdownNow(); - } - - int roundRobin; - - ParallelScheduler(int n, ThreadFactory factory) { - if (n <= 0) { - throw new IllegalArgumentException("n > 0 required but it was " + n); - } - this.n = n; - this.factory = factory; - init(n); - } - - void init(int n) { - ExecutorService[] a = new ExecutorService[n]; - for (int i = 0; i < n; i++) { - a[i] = Executors.newSingleThreadExecutor(factory); - } - EXECUTORS.lazySet(this, a); - } - - @Override - public void start() { - ExecutorService[] b = null; - for (; ; ) { - ExecutorService[] a = executors; - if (a != SHUTDOWN) { - if (b != null) { - for (ExecutorService exec : b) { - exec.shutdownNow(); - } - } - return; - } - - if (b == null) { - b = new ExecutorService[n]; - for (int i = 0; i < n; i++) { - b[i] = Executors.newSingleThreadExecutor(factory); - } - } - - if (EXECUTORS.compareAndSet(this, a, b)) { - return; - } - } - } - - @Override - public void shutdown() { - ExecutorService[] a = executors; - if (a != SHUTDOWN) { - a = EXECUTORS.getAndSet(this, SHUTDOWN); - if (a != SHUTDOWN) { - for (ExecutorService exec : a) { - exec.shutdownNow(); - } - } - } - } - - ExecutorService pick() { - ExecutorService[] a = executors; - if (a != SHUTDOWN) { - // ignoring the race condition here, its already random who gets which executor - int idx = roundRobin; - if (idx == n) { - idx = 0; - roundRobin = 0; - } else { - roundRobin = idx + 1; - } - return a[idx]; - } - return TERMINATED; - } - - @Override - public Cancellation schedule(Runnable task) { - ExecutorService exec = pick(); - Future f = exec.submit(task); - return () -> f.cancel(false); - } - - @Override - public Worker createWorker() { - return new ParallelWorker(pick()); - } - - static final class ParallelWorker implements Worker { - final ExecutorService exec; - - OpenHashSet tasks; - - volatile boolean shutdown; - - public ParallelWorker(ExecutorService exec) { - this.exec = exec; - this.tasks = new OpenHashSet<>(); - } - - @Override - public Cancellation schedule(Runnable task) { - if (shutdown) { - return REJECTED; - } - - ParallelWorkerTask pw = new ParallelWorkerTask(task, this); - - synchronized (this) { - if (shutdown) { - return REJECTED; - } - tasks.add(pw); - } - - Future f; - try { - f = exec.submit(pw); - } catch (RejectedExecutionException ex) { - Operators.onErrorDropped(ex); - return REJECTED; - } - - if (shutdown) { - f.cancel(true); - return REJECTED; - } - - pw.setFuture(f); - - return pw; - } - - @Override - public void shutdown() { - if (shutdown) { - return; - } - shutdown = true; - OpenHashSet set; - synchronized (this) { - set = tasks; - tasks = null; - } - - if (set != null) { - Object[] a = set.keys(); - for (Object o : a) { - if (o != null) { - ((ParallelWorkerTask) o).cancelFuture(); - } - } - } - } - - void remove(ParallelWorkerTask task) { - if (shutdown) { - return; - } - - synchronized (this) { - if (shutdown) { - return; - } - tasks.remove(task); - } - } - - int pendingTasks() { - if (shutdown) { - return 0; - } - - synchronized (this) { - OpenHashSet set = tasks; - if (set != null) { - return set.size(); - } - return 0; - } - } - - static final class ParallelWorkerTask implements Runnable, Cancellation { - final Runnable run; - - final ParallelWorker parent; - - volatile boolean cancelled; - - volatile Future future; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater FUTURE = - AtomicReferenceFieldUpdater.newUpdater(ParallelWorkerTask.class, Future.class, "future"); - - static final Future FINISHED = CompletableFuture.completedFuture(null); - static final Future CANCELLED = CompletableFuture.completedFuture(null); - - public ParallelWorkerTask(Runnable run, ParallelWorker parent) { - this.run = run; - this.parent = parent; - } - - @Override - public void run() { - if (cancelled || parent.shutdown) { - return; - } - try { - try { - run.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - Operators.onErrorDropped(ex); - } - } finally { - for (; ; ) { - Future f = future; - if (f == CANCELLED) { - break; - } - if (FUTURE.compareAndSet(this, f, FINISHED)) { - parent.remove(this); - break; - } - } - } - } - - @Override - public void dispose() { - if (!cancelled) { - cancelled = true; - - Future f = future; - if (f != CANCELLED && f != FINISHED) { - f = FUTURE.getAndSet(this, CANCELLED); - if (f != CANCELLED && f != FINISHED) { - if (f != null) { - f.cancel(parent.shutdown); - } - - parent.remove(this); - } - } - } - } - - void setFuture(Future f) { - if (future != null || !FUTURE.compareAndSet(this, null, f)) { - if (future != FINISHED) { - f.cancel(false); - } - } - } - - void cancelFuture() { - Future f = future; - if (f != CANCELLED && f != FINISHED) { - f = FUTURE.getAndSet(this, CANCELLED); - if (f != null && f != CANCELLED && f != FINISHED) { - f.cancel(true); - } - } - } - } - } -} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java deleted file mode 100644 index aa3b77fc6..000000000 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.cloud.stream.reactive.reactor.core.scheduler; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import reactor.core.Cancellation; -import reactor.core.Exceptions; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.util.concurrent.OpenHashSet; - -/** - * Scheduler that works with a single-threaded ExecutorService and is suited for - * same-thread work (like an event dispatch thread). - * @author Stephane Maldini - */ -final class SingleScheduler implements Scheduler { - - static final AtomicLong COUNTER = new AtomicLong(); - - final ThreadFactory factory; - - volatile ExecutorService executor; - static final AtomicReferenceFieldUpdater EXECUTORS = - AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class, ExecutorService.class, "executor"); - - static final ExecutorService TERMINATED; - - static { - TERMINATED = Executors.newSingleThreadExecutor(); - TERMINATED.shutdownNow(); - } - - public SingleScheduler(ThreadFactory factory) { - this.factory = factory; - init(); - } - - private void init() { - EXECUTORS.lazySet(this, Executors.newSingleThreadExecutor(factory)); - } - - public boolean isStarted() { - return executor != TERMINATED; - } - - @Override - public void start() { - ExecutorService b = null; - for (; ; ) { - ExecutorService a = executor; - if (a != TERMINATED) { - if (b != null) { - b.shutdownNow(); - } - return; - } - - if (b == null) { - b = Executors.newSingleThreadExecutor(factory); - } - - if (EXECUTORS.compareAndSet(this, a, b)) { - return; - } - } - } - - @Override - public void shutdown() { - ExecutorService a = executor; - if (a != TERMINATED) { - a = EXECUTORS.getAndSet(this, TERMINATED); - if (a != TERMINATED) { - a.shutdownNow(); - } - } - } - - @Override - public Cancellation schedule(Runnable task) { - try { - Future f = executor.submit(task); - return () -> f.cancel(false); - } catch (RejectedExecutionException ex) { - Operators.onErrorDropped(ex); - return REJECTED; - } - } - - @Override - public Worker createWorker() { - return new SingleWorker(executor); - } - - static final class SingleWorker implements Worker { - final ExecutorService exec; - - OpenHashSet tasks; - - volatile boolean shutdown; - - public SingleWorker(ExecutorService exec) { - this.exec = exec; - this.tasks = new OpenHashSet<>(); - } - - @Override - public Cancellation schedule(Runnable task) { - if (shutdown) { - return REJECTED; - } - - SingleWorkerTask pw = new SingleWorkerTask(task, this); - - synchronized (this) { - if (shutdown) { - return REJECTED; - } - tasks.add(pw); - } - - Future f; - try { - f = exec.submit(pw); - } catch (RejectedExecutionException ex) { - Operators.onErrorDropped(ex); - return REJECTED; - } - - if (shutdown) { - f.cancel(true); - return REJECTED; - } - - pw.setFuture(f); - - return pw; - } - - @Override - public void shutdown() { - if (shutdown) { - return; - } - shutdown = true; - OpenHashSet set; - synchronized (this) { - set = tasks; - tasks = null; - } - - if (set != null && !set.isEmpty()) { - Object[] a = set.keys(); - for (Object o : a) { - if (o != null) { - ((SingleWorkerTask) o).cancelFuture(); - } - } - } - } - - void remove(SingleWorkerTask task) { - if (shutdown) { - return; - } - - synchronized (this) { - if (shutdown) { - return; - } - tasks.remove(task); - } - } - - int pendingTasks() { - if (shutdown) { - return 0; - } - - synchronized (this) { - OpenHashSet set = tasks; - if (set != null) { - return set.size(); - } - return 0; - } - } - - static final class SingleWorkerTask implements Runnable, Cancellation { - final Runnable run; - - final SingleWorker parent; - - volatile boolean cancelled; - - volatile Future future; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater FUTURE = - AtomicReferenceFieldUpdater.newUpdater(SingleWorkerTask.class, Future.class, "future"); - - static final Future FINISHED = CompletableFuture.completedFuture(null); - static final Future CANCELLED = CompletableFuture.completedFuture(null); - - public SingleWorkerTask(Runnable run, SingleWorker parent) { - this.run = run; - this.parent = parent; - } - - @Override - public void run() { - if (cancelled || parent.shutdown) { - return; - } - try { - try { - run.run(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - Operators.onErrorDropped(ex); - } - } finally { - for (; ; ) { - Future f = future; - if (f == CANCELLED) { - break; - } - if (FUTURE.compareAndSet(this, f, FINISHED)) { - parent.remove(this); - break; - } - } - } - } - - @Override - public void dispose() { - if (!cancelled) { - cancelled = true; - - Future f = future; - if (f != CANCELLED && f != FINISHED) { - f = FUTURE.getAndSet(this, CANCELLED); - if (f != CANCELLED && f != FINISHED) { - if (f != null) { - f.cancel(false); - } - - parent.remove(this); - } - } - } - } - - void setFuture(Future f) { - if (future != null || !FUTURE.compareAndSet(this, null, f)) { - if (future != FINISHED) { - f.cancel(false); - } - } - } - - void cancelFuture() { - Future f = future; - if (f != CANCELLED && f != FINISHED) { - f = FUTURE.getAndSet(this, CANCELLED); - if (f != null && f != CANCELLED && f != FINISHED) { - f.cancel(false); - } - } - } - } - } -} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java deleted file mode 100644 index 33ffdc8ce..000000000 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java +++ /dev/null @@ -1,465 +0,0 @@ -/* - * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.cloud.stream.reactive.reactor.core.scheduler; - -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - -import reactor.core.Cancellation; -import reactor.core.publisher.Operators; -import reactor.core.scheduler.TimedScheduler; -import reactor.util.concurrent.OpenHashSet; - -/** - * A TimedScheduler with an embedded, single-threaded ScheduledExecutorService, - * shared among all workers. - */ -final class SingleTimedScheduler implements TimedScheduler { - - static final AtomicLong COUNTER = new AtomicLong(); - - final ScheduledThreadPoolExecutor executor; - - /** - * Constructs a new SingleTimedScheduler with the given thread factory. - * - * @param threadFactory the thread factory to use - */ - SingleTimedScheduler(ThreadFactory threadFactory) { - ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, threadFactory); - e.setRemoveOnCancelPolicy(true); - executor = e; - } - - @Override - public Cancellation schedule(Runnable task) { - try { - Future f = executor.submit(task); - return () -> f.cancel(false); - } catch (RejectedExecutionException ex) { - return REJECTED; - } - } - - @Override - public Cancellation schedule(Runnable task, long delay, TimeUnit unit) { - try { - Future f = executor.schedule(task, delay, unit); - return () -> f.cancel(false); - } catch (RejectedExecutionException ex) { - return REJECTED; - } - } - - @Override - public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { - try { - Future f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); - return () -> f.cancel(false); - } catch (RejectedExecutionException ex) { - return REJECTED; - } - } - - @Override - public void start() { - throw new UnsupportedOperationException("Not supported, yet."); - } - - @Override - public void shutdown() { - executor.shutdownNow(); - } - - @Override - public TimedWorker createWorker() { - return new SingleTimedSchedulerWorker(executor); - } - - static final class SingleTimedSchedulerWorker implements TimedWorker { - final ScheduledThreadPoolExecutor executor; - - OpenHashSet tasks; - - volatile boolean terminated; - - public SingleTimedSchedulerWorker(ScheduledThreadPoolExecutor executor) { - this.executor = executor; - this.tasks = new OpenHashSet<>(); - } - - @Override - public Cancellation schedule(Runnable task) { - if (terminated) { - return REJECTED; - } - - TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this); - - synchronized (this) { - if (terminated) { - return REJECTED; - } - - tasks.add(sr); - } - - try { - Future f = executor.submit(sr); - sr.set(f); - } catch (RejectedExecutionException ex) { - sr.dispose(); - return REJECTED; - } - - return sr; - } - - void delete(CancelFuture r) { - synchronized (this) { - if (!terminated) { - tasks.remove(r); - } - } - } - - @Override - public Cancellation schedule(Runnable task, long delay, TimeUnit unit) { - if (terminated) { - return REJECTED; - } - - TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this); - - synchronized (this) { - if (terminated) { - return REJECTED; - } - - tasks.add(sr); - } - - try { - Future f = executor.schedule(sr, delay, unit); - sr.set(f); - } catch (RejectedExecutionException ex) { - sr.dispose(); - return REJECTED; - } - - return sr; - } - - @Override - public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { - if (terminated) { - return REJECTED; - } - - TimedPeriodicScheduledRunnable sr = new TimedPeriodicScheduledRunnable(task, this); - - synchronized (this) { - if (terminated) { - return REJECTED; - } - - tasks.add(sr); - } - - try { - Future f = executor.scheduleAtFixedRate(sr, initialDelay, period, unit); - sr.set(f); - } catch (RejectedExecutionException ex) { - sr.dispose(); - return REJECTED; - } - - return sr; - } - - @Override - public void shutdown() { - if (terminated) { - return; - } - terminated = true; - - OpenHashSet set; - - synchronized (this) { - set = tasks; - if (set == null) { - return; - } - tasks = null; - } - - if (!set.isEmpty()) { - Object[] keys = set.keys(); - for (Object c : keys) { - if (c != null) { - ((CancelFuture) c).cancelFuture(); - } - } - } - } - } - - interface CancelFuture { - void cancelFuture(); - } - - static final class TimedScheduledRunnable - extends AtomicReference> implements Runnable, Cancellation, CancelFuture { - /** */ - private static final long serialVersionUID = 2284024836904862408L; - - final Runnable task; - - final SingleTimedSchedulerWorker parent; - - volatile Thread current; - static final AtomicReferenceFieldUpdater CURRENT = - AtomicReferenceFieldUpdater.newUpdater(TimedScheduledRunnable.class, Thread.class, "current"); - - static final Runnable EMPTY = new Runnable() { - @Override - public void run() { - - } - }; - - static final Future CANCELLED_FUTURE = new FutureTask<>(EMPTY, null); - - static final Future FINISHED = new FutureTask<>(EMPTY, null); - - public TimedScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) { - this.task = task; - this.parent = parent; - } - - @Override - public void run() { - CURRENT.lazySet(this, Thread.currentThread()); - try { - try { - task.run(); - } catch (Throwable e) { - Operators.onErrorDropped(e); - } - } finally { - for (; ; ) { - Future a = get(); - if (a == CANCELLED_FUTURE) { - break; - } - if (compareAndSet(a, FINISHED)) { - if (a != null) { - doCancel(a); - } - parent.delete(this); - break; - } - } - CURRENT.lazySet(this, null); - } - } - - void doCancel(Future a) { - a.cancel(Thread.currentThread() != current); - } - - @Override - public void cancelFuture() { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (compareAndSet(a, CANCELLED_FUTURE)) { - if (a != null) { - doCancel(a); - } - return; - } - } - } - - @Override - public void dispose() { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (compareAndSet(a, CANCELLED_FUTURE)) { - if (a != null) { - doCancel(a); - } - parent.delete(this); - return; - } - } - } - - - void setFuture(Future f) { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (a == CANCELLED_FUTURE) { - doCancel(a); - return; - } - if (compareAndSet(null, f)) { - return; - } - } - } - - @Override - public String toString() { - return "TimedScheduledRunnable[cancelled=" + (get() == CANCELLED_FUTURE) + - ", task=" + task + - "]"; - } - } - - static final class TimedPeriodicScheduledRunnable - extends AtomicReference> - implements Runnable, Cancellation, CancelFuture { - /** */ - private static final long serialVersionUID = 2284024836904862408L; - - final Runnable task; - - final SingleTimedSchedulerWorker parent; - - volatile Thread current; - static final AtomicReferenceFieldUpdater CURRENT = - AtomicReferenceFieldUpdater.newUpdater(TimedPeriodicScheduledRunnable.class, Thread.class, "current"); - - static final Runnable EMPTY = new Runnable() { - @Override - public void run() { - - } - }; - - static final Future CANCELLED_FUTURE = new FutureTask<>(EMPTY, null); - - static final Future FINISHED = new FutureTask<>(EMPTY, null); - - public TimedPeriodicScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) { - this.task = task; - this.parent = parent; - } - - @Override - public void run() { - CURRENT.lazySet(this, Thread.currentThread()); - try { - try { - task.run(); - } catch (Throwable ex) { - Operators.onErrorDropped(ex); - for (; ; ) { - Future a = get(); - if (a == CANCELLED_FUTURE) { - break; - } - if (compareAndSet(a, FINISHED)) { - parent.delete(this); - break; - } - } - } - } finally { - CURRENT.lazySet(this, null); - } - } - - void doCancel(Future a) { - a.cancel(false); - } - - @Override - public void cancelFuture() { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (compareAndSet(a, CANCELLED_FUTURE)) { - if (a != null) { - doCancel(a); - } - return; - } - } - } - - @Override - public void dispose() { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (compareAndSet(a, CANCELLED_FUTURE)) { - if (a != null) { - doCancel(a); - } - parent.delete(this); - return; - } - } - } - - - void setFuture(Future f) { - for (; ; ) { - Future a = get(); - if (a == FINISHED) { - return; - } - if (a == CANCELLED_FUTURE) { - doCancel(a); - return; - } - if (compareAndSet(null, f)) { - return; - } - } - } - - @Override - public String toString() { - return "TimedPeriodicScheduledRunnable[cancelled=" + get() + ", task=" + task + "]"; - } - } - -}