Update to Reactor 3.0.4

Fixes #751

- Use Reactor 3.0.4.RELEASE
- Remove Scheduler implementations that were introduced for fixes
- Shade `io.reactivex:rxjava-reactive-streams` classes to replace removed RxJava1 adapter support

Signed-off-by: Marius Bogoevici <mbogoevici@pivotal.io>
This commit is contained in:
Marius Bogoevici
2017-01-05 15:52:09 -05:00
parent 98431a6d25
commit 2abac42eba
12 changed files with 59 additions and 1528 deletions

View File

@@ -27,6 +27,11 @@
<artifactId>rxjava</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava-reactive-streams</artifactId>
<version>${rxjava-reactive-streams.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
@@ -44,4 +49,39 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes>
<include>io.reactivex:rxjava-reactive-streams</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>rx.RxReactiveStreams</pattern>
<shadedPattern>org.springframework.cloud.stream.reactive.shaded.rx.RxReactiveStreams</shadedPattern>
</relocation>
<relocation>
<pattern>rx.internal</pattern>
<shadedPattern>org.springframework.cloud.stream.reactive.shaded.rx.internal</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@@ -16,8 +16,8 @@
package org.springframework.cloud.stream.reactive;
import reactor.adapter.RxJava1Adapter;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.core.MethodParameter;
@@ -48,7 +48,7 @@ public class MessageChannelToInputObservableParameterAdapter
@Override
public Observable<?> adapt(final SubscribableChannel bindingTarget, MethodParameter parameter) {
return RxJava1Adapter.publisherToObservable(
return RxReactiveStreams.toObservable(
this.messageChannelToInputFluxArgumentAdapter.adapt(bindingTarget, parameter));
}
}

View File

@@ -16,8 +16,10 @@
package org.springframework.cloud.stream.reactive;
import reactor.adapter.RxJava1Adapter;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
@@ -59,8 +61,9 @@ public class MessageChannelToObservableSenderParameterAdapter implements
@Override
public Single<Void> send(Observable<?> observable) {
return RxJava1Adapter.publisherToSingle(
this.fluxSender.send(RxJava1Adapter.observableToFlux(observable)));
Publisher<?> adaptedPublisher = RxReactiveStreams.toPublisher(observable);
return RxReactiveStreams.toSingle(
this.fluxSender.send(Flux.from(adaptedPublisher)));
}
};
}

View File

@@ -16,8 +16,9 @@
package org.springframework.cloud.stream.reactive;
import reactor.adapter.RxJava1Adapter;
import reactor.core.publisher.Flux;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.messaging.MessageChannel;
@@ -46,7 +47,7 @@ public class ObservableToMessageChannelResultAdapter
}
public void adapt(Observable<?> streamListenerResult, MessageChannel bindingTarget) {
this.fluxToMessageChannelResultAdapter.adapt(RxJava1Adapter.observableToFlux(streamListenerResult),
this.fluxToMessageChannelResultAdapter.adapt(Flux.from(RxReactiveStreams.toPublisher(streamListenerResult)),
bindingTarget);
}
}

View File

@@ -16,20 +16,15 @@
package org.springframework.cloud.stream.reactive;
import java.util.concurrent.ExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.scheduler.Schedulers;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.cloud.stream.reactive.reactor.core.scheduler.NoInterruptOnCancelSchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ClassUtils;
/**
* @author Marius Bogoevici
@@ -40,31 +35,6 @@ public class ReactiveSupportAutoConfiguration {
private static Log log = LogFactory.getLog(ReactiveSupportAutoConfiguration.class);
static {
try {
// Override Schedulers.Factory for Reactor 3.0.0 to work around
// https://github.com/reactor/reactor-core/issues/159
// To be removed once Reactor 3.0.1+ is used
Class<?> executorServiceSchedulerClass = ClassUtils.forName("reactor.core.scheduler.ExecutorServiceScheduler", null);
try {
// simple check that the construction version with the cancellation option is not on the classpath
executorServiceSchedulerClass.getConstructor(ExecutorService.class, Boolean.class);
}
catch (NoSuchMethodException e) {
if (log.isDebugEnabled()) {
log.debug("Overriding Schedulers for Reactor");
}
Schedulers.setFactory(new NoInterruptOnCancelSchedulerFactory());
}
}
catch (ClassNotFoundException e) {
// Ignore if absent - means that we're on a different Reactor version than expected
if (log.isInfoEnabled()) {
log.info("Class reactor.core.scheduler.ExecutorServiceScheduler not found. Check Reactor version.");
}
}
}
@Bean
public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter(
CompositeMessageConverterFactory compositeMessageConverterFactory) {

View File

@@ -1,349 +0,0 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import reactor.core.Cancellation;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.OpenHashSet;
/**
* Dynamically creates ExecutorService-based Workers and caches the thread pools, reusing
* them once the Workers have been shut down.
* <p>
* The maximum number of created thread pools is unbounded.
* <p>
* The default time-to-live for unused thread pools is 60 seconds, use the
* appropriate constructor to set a different value.
* <p>
* This scheduler is not restartable (may be later).
*
* @author Stephane Maldini
*/
final class ElasticScheduler implements Scheduler {
static final AtomicLong COUNTER = new AtomicLong();
static final ThreadFactory EVICTOR_FACTORY = r -> {
Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet());
t.setDaemon(true);
return t;
};
final ThreadFactory factory;
final int ttlSeconds;
static final int DEFAULT_TTL_SECONDS = 60;
final Queue<ExecutorServiceExpiry> cache;
final Queue<ExecutorService> all;
final ScheduledExecutorService evictor;
static final ExecutorService SHUTDOWN;
static {
SHUTDOWN = Executors.newSingleThreadExecutor();
SHUTDOWN.shutdownNow();
}
volatile boolean shutdown;
public ElasticScheduler(ThreadFactory factory, int ttlSeconds) {
this.ttlSeconds = ttlSeconds;
this.factory = factory;
this.cache = new ConcurrentLinkedQueue<>();
this.all = new ConcurrentLinkedQueue<>();
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS);
}
@Override
public void start() {
throw new UnsupportedOperationException("Restarting not supported yet");
}
@Override
public void shutdown() {
if (shutdown) {
return;
}
shutdown = true;
evictor.shutdownNow();
cache.clear();
ExecutorService exec;
while ((exec = all.poll()) != null) {
exec.shutdownNow();
}
}
ExecutorService pick() {
if (shutdown) {
return SHUTDOWN;
}
ExecutorService result;
ExecutorServiceExpiry e = cache.poll();
if (e != null) {
return e.executor;
}
result = Executors.newSingleThreadExecutor(factory);
all.offer(result);
if (shutdown) {
all.remove(result);
return SHUTDOWN;
}
return result;
}
@Override
public Cancellation schedule(Runnable task) {
ExecutorService exec = pick();
Runnable wrapper = () -> {
try {
try {
task.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Operators.onErrorDropped(ex);
}
} finally {
release(exec);
}
};
Future<?> f;
try {
f = exec.submit(wrapper);
} catch (RejectedExecutionException ex) {
Operators.onErrorDropped(ex);
return REJECTED;
}
return () -> f.cancel(true);
}
@Override
public Worker createWorker() {
ExecutorService exec = pick();
return new CachedWorker(exec, this);
}
void release(ExecutorService exec) {
if (exec != SHUTDOWN && !shutdown) {
ExecutorServiceExpiry e = new ExecutorServiceExpiry(exec, System.currentTimeMillis() + ttlSeconds * 1000L);
cache.offer(e);
if (shutdown) {
if (cache.remove(e)) {
exec.shutdownNow();
}
}
}
}
void eviction() {
long now = System.currentTimeMillis();
List<ExecutorServiceExpiry> list = new ArrayList<>(cache);
for (ExecutorServiceExpiry e : list) {
if (e.expireMillis < now) {
if (cache.remove(e)) {
e.executor.shutdownNow();
}
}
}
}
static final class ExecutorServiceExpiry {
final ExecutorService executor;
final long expireMillis;
public ExecutorServiceExpiry(ExecutorService executor, long expireMillis) {
this.executor = executor;
this.expireMillis = expireMillis;
}
}
static final class CachedWorker implements Worker {
final ExecutorService executor;
final ElasticScheduler parent;
volatile boolean shutdown;
OpenHashSet<CachedTask> tasks;
public CachedWorker(ExecutorService executor, ElasticScheduler parent) {
this.executor = executor;
this.parent = parent;
this.tasks = new OpenHashSet<>();
}
@Override
public Cancellation schedule(Runnable task) {
if (shutdown) {
return REJECTED;
}
CachedTask ct = new CachedTask(task, this);
synchronized (this) {
if (shutdown) {
return REJECTED;
}
tasks.add(ct);
}
Future<?> f;
try {
f = executor.submit(ct);
} catch (RejectedExecutionException ex) {
Operators.onErrorDropped(ex);
return REJECTED;
}
ct.setFuture(f);
return ct;
}
@Override
public void shutdown() {
if (shutdown) {
return;
}
OpenHashSet<CachedTask> set;
synchronized (this) {
if (shutdown) {
return;
}
shutdown = true;
set = tasks;
tasks = null;
}
if (!set.isEmpty()) {
Object[] keys = set.keys();
for (Object o : keys) {
if (o != null) {
((CachedTask) o).cancelFuture();
}
}
}
parent.release(executor);
}
void remove(CachedTask task) {
if (shutdown) {
return;
}
synchronized (this) {
if (shutdown) {
return;
}
tasks.remove(task);
}
}
static final class CachedTask
extends AtomicReference<Future<?>>
implements Runnable, Cancellation {
/** */
private static final long serialVersionUID = 6799295393954430738L;
final Runnable run;
final CachedWorker parent;
volatile boolean cancelled;
static final FutureTask<Object> CANCELLED = new FutureTask<>(() -> {
}, null);
static final FutureTask<Object> FINISHED = new FutureTask<>(() -> {
}, null);
public CachedTask(Runnable run, CachedWorker parent) {
this.run = run;
this.parent = parent;
}
@Override
public void run() {
try {
if (!parent.shutdown && !cancelled) {
run.run();
}
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Operators.onErrorDropped(ex);
} finally {
lazySet(FINISHED);
parent.remove(this);
}
}
@Override
public void dispose() {
cancelled = true;
cancelFuture();
}
void setFuture(Future<?> f) {
if (!compareAndSet(null, f)) {
if (get() != FINISHED) {
f.cancel(true);
}
}
}
void cancelFuture() {
Future<?> f = get();
if (f != CANCELLED && f != FINISHED) {
f = getAndSet(CANCELLED);
if (f != null && f != CANCELLED && f != FINISHED) {
f.cancel(true);
}
}
}
}
}
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
import java.util.concurrent.ThreadFactory;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.core.scheduler.TimedScheduler;
/**
* {@link reactor.core.scheduler.Schedulers.Factory} implementation that pulls in
* {@link reactor.core.scheduler.Scheduler} implementations with fixes for
* <a href="https://github.com/reactor/reactor-core/issues/159"/>.
*
* This is a temporary solution, until a Reactor release is available.
*
* @author Marius Bogoevici
*/
public class NoInterruptOnCancelSchedulerFactory implements Schedulers.Factory {
@Override
public Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
return new ElasticScheduler(threadFactory, ttlSeconds);
}
@Override
public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
return new ParallelScheduler(parallelism, threadFactory);
}
@Override
public Scheduler newSingle(ThreadFactory threadFactory) {
return new SingleScheduler(threadFactory);
}
@Override
public TimedScheduler newTimer(ThreadFactory threadFactory) {
return new SingleTimedScheduler(threadFactory);
}
}

View File

@@ -1,325 +0,0 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import reactor.core.Cancellation;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.OpenHashSet;
/**
* Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers
* and is suited for parallel work.
*
* @author Stephane Maldini
*/
final class ParallelScheduler implements Scheduler {
static final AtomicLong COUNTER = new AtomicLong();
final int n;
final ThreadFactory factory;
volatile ExecutorService[] executors;
static final AtomicReferenceFieldUpdater<ParallelScheduler, ExecutorService[]> EXECUTORS =
AtomicReferenceFieldUpdater.newUpdater(ParallelScheduler.class, ExecutorService[].class, "executors");
static final ExecutorService[] SHUTDOWN = new ExecutorService[0];
static final ExecutorService TERMINATED;
static {
TERMINATED = Executors.newSingleThreadExecutor();
TERMINATED.shutdownNow();
}
int roundRobin;
ParallelScheduler(int n, ThreadFactory factory) {
if (n <= 0) {
throw new IllegalArgumentException("n > 0 required but it was " + n);
}
this.n = n;
this.factory = factory;
init(n);
}
void init(int n) {
ExecutorService[] a = new ExecutorService[n];
for (int i = 0; i < n; i++) {
a[i] = Executors.newSingleThreadExecutor(factory);
}
EXECUTORS.lazySet(this, a);
}
@Override
public void start() {
ExecutorService[] b = null;
for (; ; ) {
ExecutorService[] a = executors;
if (a != SHUTDOWN) {
if (b != null) {
for (ExecutorService exec : b) {
exec.shutdownNow();
}
}
return;
}
if (b == null) {
b = new ExecutorService[n];
for (int i = 0; i < n; i++) {
b[i] = Executors.newSingleThreadExecutor(factory);
}
}
if (EXECUTORS.compareAndSet(this, a, b)) {
return;
}
}
}
@Override
public void shutdown() {
ExecutorService[] a = executors;
if (a != SHUTDOWN) {
a = EXECUTORS.getAndSet(this, SHUTDOWN);
if (a != SHUTDOWN) {
for (ExecutorService exec : a) {
exec.shutdownNow();
}
}
}
}
ExecutorService pick() {
ExecutorService[] a = executors;
if (a != SHUTDOWN) {
// ignoring the race condition here, its already random who gets which executor
int idx = roundRobin;
if (idx == n) {
idx = 0;
roundRobin = 0;
} else {
roundRobin = idx + 1;
}
return a[idx];
}
return TERMINATED;
}
@Override
public Cancellation schedule(Runnable task) {
ExecutorService exec = pick();
Future<?> f = exec.submit(task);
return () -> f.cancel(false);
}
@Override
public Worker createWorker() {
return new ParallelWorker(pick());
}
static final class ParallelWorker implements Worker {
final ExecutorService exec;
OpenHashSet<ParallelWorkerTask> tasks;
volatile boolean shutdown;
public ParallelWorker(ExecutorService exec) {
this.exec = exec;
this.tasks = new OpenHashSet<>();
}
@Override
public Cancellation schedule(Runnable task) {
if (shutdown) {
return REJECTED;
}
ParallelWorkerTask pw = new ParallelWorkerTask(task, this);
synchronized (this) {
if (shutdown) {
return REJECTED;
}
tasks.add(pw);
}
Future<?> f;
try {
f = exec.submit(pw);
} catch (RejectedExecutionException ex) {
Operators.onErrorDropped(ex);
return REJECTED;
}
if (shutdown) {
f.cancel(true);
return REJECTED;
}
pw.setFuture(f);
return pw;
}
@Override
public void shutdown() {
if (shutdown) {
return;
}
shutdown = true;
OpenHashSet<ParallelWorkerTask> set;
synchronized (this) {
set = tasks;
tasks = null;
}
if (set != null) {
Object[] a = set.keys();
for (Object o : a) {
if (o != null) {
((ParallelWorkerTask) o).cancelFuture();
}
}
}
}
void remove(ParallelWorkerTask task) {
if (shutdown) {
return;
}
synchronized (this) {
if (shutdown) {
return;
}
tasks.remove(task);
}
}
int pendingTasks() {
if (shutdown) {
return 0;
}
synchronized (this) {
OpenHashSet<?> set = tasks;
if (set != null) {
return set.size();
}
return 0;
}
}
static final class ParallelWorkerTask implements Runnable, Cancellation {
final Runnable run;
final ParallelWorker parent;
volatile boolean cancelled;
volatile Future<?> future;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<ParallelWorkerTask, Future> FUTURE =
AtomicReferenceFieldUpdater.newUpdater(ParallelWorkerTask.class, Future.class, "future");
static final Future<Object> FINISHED = CompletableFuture.completedFuture(null);
static final Future<Object> CANCELLED = CompletableFuture.completedFuture(null);
public ParallelWorkerTask(Runnable run, ParallelWorker parent) {
this.run = run;
this.parent = parent;
}
@Override
public void run() {
if (cancelled || parent.shutdown) {
return;
}
try {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Operators.onErrorDropped(ex);
}
} finally {
for (; ; ) {
Future<?> f = future;
if (f == CANCELLED) {
break;
}
if (FUTURE.compareAndSet(this, f, FINISHED)) {
parent.remove(this);
break;
}
}
}
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
Future<?> f = future;
if (f != CANCELLED && f != FINISHED) {
f = FUTURE.getAndSet(this, CANCELLED);
if (f != CANCELLED && f != FINISHED) {
if (f != null) {
f.cancel(parent.shutdown);
}
parent.remove(this);
}
}
}
}
void setFuture(Future<?> f) {
if (future != null || !FUTURE.compareAndSet(this, null, f)) {
if (future != FINISHED) {
f.cancel(false);
}
}
}
void cancelFuture() {
Future<?> f = future;
if (f != CANCELLED && f != FINISHED) {
f = FUTURE.getAndSet(this, CANCELLED);
if (f != null && f != CANCELLED && f != FINISHED) {
f.cancel(true);
}
}
}
}
}
}

View File

@@ -1,295 +0,0 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import reactor.core.Cancellation;
import reactor.core.Exceptions;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.OpenHashSet;
/**
* Scheduler that works with a single-threaded ExecutorService and is suited for
* same-thread work (like an event dispatch thread).
* @author Stephane Maldini
*/
final class SingleScheduler implements Scheduler {
static final AtomicLong COUNTER = new AtomicLong();
final ThreadFactory factory;
volatile ExecutorService executor;
static final AtomicReferenceFieldUpdater<SingleScheduler, ExecutorService> EXECUTORS =
AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class, ExecutorService.class, "executor");
static final ExecutorService TERMINATED;
static {
TERMINATED = Executors.newSingleThreadExecutor();
TERMINATED.shutdownNow();
}
public SingleScheduler(ThreadFactory factory) {
this.factory = factory;
init();
}
private void init() {
EXECUTORS.lazySet(this, Executors.newSingleThreadExecutor(factory));
}
public boolean isStarted() {
return executor != TERMINATED;
}
@Override
public void start() {
ExecutorService b = null;
for (; ; ) {
ExecutorService a = executor;
if (a != TERMINATED) {
if (b != null) {
b.shutdownNow();
}
return;
}
if (b == null) {
b = Executors.newSingleThreadExecutor(factory);
}
if (EXECUTORS.compareAndSet(this, a, b)) {
return;
}
}
}
@Override
public void shutdown() {
ExecutorService a = executor;
if (a != TERMINATED) {
a = EXECUTORS.getAndSet(this, TERMINATED);
if (a != TERMINATED) {
a.shutdownNow();
}
}
}
@Override
public Cancellation schedule(Runnable task) {
try {
Future<?> f = executor.submit(task);
return () -> f.cancel(false);
} catch (RejectedExecutionException ex) {
Operators.onErrorDropped(ex);
return REJECTED;
}
}
@Override
public Worker createWorker() {
return new SingleWorker(executor);
}
static final class SingleWorker implements Worker {
final ExecutorService exec;
OpenHashSet<SingleWorkerTask> tasks;
volatile boolean shutdown;
public SingleWorker(ExecutorService exec) {
this.exec = exec;
this.tasks = new OpenHashSet<>();
}
@Override
public Cancellation schedule(Runnable task) {
if (shutdown) {
return REJECTED;
}
SingleWorkerTask pw = new SingleWorkerTask(task, this);
synchronized (this) {
if (shutdown) {
return REJECTED;
}
tasks.add(pw);
}
Future<?> f;
try {
f = exec.submit(pw);
} catch (RejectedExecutionException ex) {
Operators.onErrorDropped(ex);
return REJECTED;
}
if (shutdown) {
f.cancel(true);
return REJECTED;
}
pw.setFuture(f);
return pw;
}
@Override
public void shutdown() {
if (shutdown) {
return;
}
shutdown = true;
OpenHashSet<SingleWorkerTask> set;
synchronized (this) {
set = tasks;
tasks = null;
}
if (set != null && !set.isEmpty()) {
Object[] a = set.keys();
for (Object o : a) {
if (o != null) {
((SingleWorkerTask) o).cancelFuture();
}
}
}
}
void remove(SingleWorkerTask task) {
if (shutdown) {
return;
}
synchronized (this) {
if (shutdown) {
return;
}
tasks.remove(task);
}
}
int pendingTasks() {
if (shutdown) {
return 0;
}
synchronized (this) {
OpenHashSet<?> set = tasks;
if (set != null) {
return set.size();
}
return 0;
}
}
static final class SingleWorkerTask implements Runnable, Cancellation {
final Runnable run;
final SingleWorker parent;
volatile boolean cancelled;
volatile Future<?> future;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<SingleWorkerTask, Future> FUTURE =
AtomicReferenceFieldUpdater.newUpdater(SingleWorkerTask.class, Future.class, "future");
static final Future<Object> FINISHED = CompletableFuture.completedFuture(null);
static final Future<Object> CANCELLED = CompletableFuture.completedFuture(null);
public SingleWorkerTask(Runnable run, SingleWorker parent) {
this.run = run;
this.parent = parent;
}
@Override
public void run() {
if (cancelled || parent.shutdown) {
return;
}
try {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
Operators.onErrorDropped(ex);
}
} finally {
for (; ; ) {
Future<?> f = future;
if (f == CANCELLED) {
break;
}
if (FUTURE.compareAndSet(this, f, FINISHED)) {
parent.remove(this);
break;
}
}
}
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
Future<?> f = future;
if (f != CANCELLED && f != FINISHED) {
f = FUTURE.getAndSet(this, CANCELLED);
if (f != CANCELLED && f != FINISHED) {
if (f != null) {
f.cancel(false);
}
parent.remove(this);
}
}
}
}
void setFuture(Future<?> f) {
if (future != null || !FUTURE.compareAndSet(this, null, f)) {
if (future != FINISHED) {
f.cancel(false);
}
}
}
void cancelFuture() {
Future<?> f = future;
if (f != CANCELLED && f != FINISHED) {
f = FUTURE.getAndSet(this, CANCELLED);
if (f != null && f != CANCELLED && f != FINISHED) {
f.cancel(false);
}
}
}
}
}
}

View File

@@ -1,465 +0,0 @@
/*
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import reactor.core.Cancellation;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.TimedScheduler;
import reactor.util.concurrent.OpenHashSet;
/**
* A TimedScheduler with an embedded, single-threaded ScheduledExecutorService,
* shared among all workers.
*/
final class SingleTimedScheduler implements TimedScheduler {
static final AtomicLong COUNTER = new AtomicLong();
final ScheduledThreadPoolExecutor executor;
/**
* Constructs a new SingleTimedScheduler with the given thread factory.
*
* @param threadFactory the thread factory to use
*/
SingleTimedScheduler(ThreadFactory threadFactory) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, threadFactory);
e.setRemoveOnCancelPolicy(true);
executor = e;
}
@Override
public Cancellation schedule(Runnable task) {
try {
Future<?> f = executor.submit(task);
return () -> f.cancel(false);
} catch (RejectedExecutionException ex) {
return REJECTED;
}
}
@Override
public Cancellation schedule(Runnable task, long delay, TimeUnit unit) {
try {
Future<?> f = executor.schedule(task, delay, unit);
return () -> f.cancel(false);
} catch (RejectedExecutionException ex) {
return REJECTED;
}
}
@Override
public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
return () -> f.cancel(false);
} catch (RejectedExecutionException ex) {
return REJECTED;
}
}
@Override
public void start() {
throw new UnsupportedOperationException("Not supported, yet.");
}
@Override
public void shutdown() {
executor.shutdownNow();
}
@Override
public TimedWorker createWorker() {
return new SingleTimedSchedulerWorker(executor);
}
static final class SingleTimedSchedulerWorker implements TimedWorker {
final ScheduledThreadPoolExecutor executor;
OpenHashSet<CancelFuture> tasks;
volatile boolean terminated;
public SingleTimedSchedulerWorker(ScheduledThreadPoolExecutor executor) {
this.executor = executor;
this.tasks = new OpenHashSet<>();
}
@Override
public Cancellation schedule(Runnable task) {
if (terminated) {
return REJECTED;
}
TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this);
synchronized (this) {
if (terminated) {
return REJECTED;
}
tasks.add(sr);
}
try {
Future<?> f = executor.submit(sr);
sr.set(f);
} catch (RejectedExecutionException ex) {
sr.dispose();
return REJECTED;
}
return sr;
}
void delete(CancelFuture r) {
synchronized (this) {
if (!terminated) {
tasks.remove(r);
}
}
}
@Override
public Cancellation schedule(Runnable task, long delay, TimeUnit unit) {
if (terminated) {
return REJECTED;
}
TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this);
synchronized (this) {
if (terminated) {
return REJECTED;
}
tasks.add(sr);
}
try {
Future<?> f = executor.schedule(sr, delay, unit);
sr.set(f);
} catch (RejectedExecutionException ex) {
sr.dispose();
return REJECTED;
}
return sr;
}
@Override
public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
if (terminated) {
return REJECTED;
}
TimedPeriodicScheduledRunnable sr = new TimedPeriodicScheduledRunnable(task, this);
synchronized (this) {
if (terminated) {
return REJECTED;
}
tasks.add(sr);
}
try {
Future<?> f = executor.scheduleAtFixedRate(sr, initialDelay, period, unit);
sr.set(f);
} catch (RejectedExecutionException ex) {
sr.dispose();
return REJECTED;
}
return sr;
}
@Override
public void shutdown() {
if (terminated) {
return;
}
terminated = true;
OpenHashSet<CancelFuture> set;
synchronized (this) {
set = tasks;
if (set == null) {
return;
}
tasks = null;
}
if (!set.isEmpty()) {
Object[] keys = set.keys();
for (Object c : keys) {
if (c != null) {
((CancelFuture) c).cancelFuture();
}
}
}
}
}
interface CancelFuture {
void cancelFuture();
}
static final class TimedScheduledRunnable
extends AtomicReference<Future<?>> implements Runnable, Cancellation, CancelFuture {
/** */
private static final long serialVersionUID = 2284024836904862408L;
final Runnable task;
final SingleTimedSchedulerWorker parent;
volatile Thread current;
static final AtomicReferenceFieldUpdater<TimedScheduledRunnable, Thread> CURRENT =
AtomicReferenceFieldUpdater.newUpdater(TimedScheduledRunnable.class, Thread.class, "current");
static final Runnable EMPTY = new Runnable() {
@Override
public void run() {
}
};
static final Future<?> CANCELLED_FUTURE = new FutureTask<>(EMPTY, null);
static final Future<?> FINISHED = new FutureTask<>(EMPTY, null);
public TimedScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) {
this.task = task;
this.parent = parent;
}
@Override
public void run() {
CURRENT.lazySet(this, Thread.currentThread());
try {
try {
task.run();
} catch (Throwable e) {
Operators.onErrorDropped(e);
}
} finally {
for (; ; ) {
Future<?> a = get();
if (a == CANCELLED_FUTURE) {
break;
}
if (compareAndSet(a, FINISHED)) {
if (a != null) {
doCancel(a);
}
parent.delete(this);
break;
}
}
CURRENT.lazySet(this, null);
}
}
void doCancel(Future<?> a) {
a.cancel(Thread.currentThread() != current);
}
@Override
public void cancelFuture() {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (compareAndSet(a, CANCELLED_FUTURE)) {
if (a != null) {
doCancel(a);
}
return;
}
}
}
@Override
public void dispose() {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (compareAndSet(a, CANCELLED_FUTURE)) {
if (a != null) {
doCancel(a);
}
parent.delete(this);
return;
}
}
}
void setFuture(Future<?> f) {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (a == CANCELLED_FUTURE) {
doCancel(a);
return;
}
if (compareAndSet(null, f)) {
return;
}
}
}
@Override
public String toString() {
return "TimedScheduledRunnable[cancelled=" + (get() == CANCELLED_FUTURE) +
", task=" + task +
"]";
}
}
static final class TimedPeriodicScheduledRunnable
extends AtomicReference<Future<?>>
implements Runnable, Cancellation, CancelFuture {
/** */
private static final long serialVersionUID = 2284024836904862408L;
final Runnable task;
final SingleTimedSchedulerWorker parent;
volatile Thread current;
static final AtomicReferenceFieldUpdater<TimedPeriodicScheduledRunnable, Thread> CURRENT =
AtomicReferenceFieldUpdater.newUpdater(TimedPeriodicScheduledRunnable.class, Thread.class, "current");
static final Runnable EMPTY = new Runnable() {
@Override
public void run() {
}
};
static final Future<?> CANCELLED_FUTURE = new FutureTask<>(EMPTY, null);
static final Future<?> FINISHED = new FutureTask<>(EMPTY, null);
public TimedPeriodicScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) {
this.task = task;
this.parent = parent;
}
@Override
public void run() {
CURRENT.lazySet(this, Thread.currentThread());
try {
try {
task.run();
} catch (Throwable ex) {
Operators.onErrorDropped(ex);
for (; ; ) {
Future<?> a = get();
if (a == CANCELLED_FUTURE) {
break;
}
if (compareAndSet(a, FINISHED)) {
parent.delete(this);
break;
}
}
}
} finally {
CURRENT.lazySet(this, null);
}
}
void doCancel(Future<?> a) {
a.cancel(false);
}
@Override
public void cancelFuture() {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (compareAndSet(a, CANCELLED_FUTURE)) {
if (a != null) {
doCancel(a);
}
return;
}
}
}
@Override
public void dispose() {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (compareAndSet(a, CANCELLED_FUTURE)) {
if (a != null) {
doCancel(a);
}
parent.delete(this);
return;
}
}
}
void setFuture(Future<?> f) {
for (; ; ) {
Future<?> a = get();
if (a == FINISHED) {
return;
}
if (a == CANCELLED_FUTURE) {
doCancel(a);
return;
}
if (compareAndSet(null, f)) {
return;
}
}
}
@Override
public String toString() {
return "TimedPeriodicScheduledRunnable[cancelled=" + get() + ", task=" + task + "]";
}
}
}