Override scheduler implementations for Reactor 3.0.0
Workaround for https://github.com/reactor/reactor-core/issues/159
This commit is contained in:
3
pom.xml
3
pom.xml
@@ -23,7 +23,7 @@
|
||||
<rxjava.version>1.1.5</rxjava.version>
|
||||
<spring.tuple.version>1.0.0.RELEASE</spring.tuple.version>
|
||||
<spring.integration.tuple.version>1.0.0.RELEASE</spring.integration.tuple.version>
|
||||
<reactor.version>3.0.0.BUILD-SNAPSHOT</reactor.version>
|
||||
<reactor.version>3.0.0.RELEASE</reactor.version>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
@@ -162,6 +162,7 @@
|
||||
<configuration>
|
||||
<configLocation>checkstyle.xml</configLocation>
|
||||
<headerLocation>checkstyle-header.txt</headerLocation>
|
||||
<suppressionsLocation>checkstyle-suppressions.xml</suppressionsLocation>
|
||||
<encoding>UTF-8</encoding>
|
||||
<consoleOutput>true</consoleOutput>
|
||||
<failsOnError>true</failsOnError>
|
||||
|
||||
0
spring-cloud-stream-reactive/.jdk8
Normal file
0
spring-cloud-stream-reactive/.jdk8
Normal file
@@ -16,10 +16,18 @@
|
||||
|
||||
package org.springframework.cloud.stream.reactive;
|
||||
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
|
||||
import org.springframework.cloud.stream.reactive.reactor.core.scheduler.NoInterruptOnCancelSchedulerFactory;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.util.ClassUtils;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -27,6 +35,33 @@ import org.springframework.context.annotation.Configuration;
|
||||
@Configuration
|
||||
public class ReactiveSupportAutoConfiguration {
|
||||
|
||||
private static Log log = LogFactory.getLog(ReactiveSupportAutoConfiguration.class);
|
||||
|
||||
static {
|
||||
try {
|
||||
// Override Schedulers.Factory for Reactor 3.0.0 to work around
|
||||
// https://github.com/reactor/reactor-core/issues/159
|
||||
// To be removed once Reactor 3.0.1+ is used
|
||||
Class<?> executorServiceSchedulerClass = ClassUtils.forName("reactor.core.scheduler.ExecutorServiceScheduler", null);
|
||||
try {
|
||||
// simple check that the construction version with the cancellation option is not on the classpath
|
||||
executorServiceSchedulerClass.getConstructor(ExecutorService.class, Boolean.class);
|
||||
}
|
||||
catch (NoSuchMethodException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Overriding Schedulers for Reactor");
|
||||
}
|
||||
Schedulers.setFactory(new NoInterruptOnCancelSchedulerFactory());
|
||||
}
|
||||
}
|
||||
catch (ClassNotFoundException e) {
|
||||
// Ignore if absent - means that we're on a different Reactor version than expected
|
||||
if (log.isInfoEnabled()) {
|
||||
log.info("Class reactor.core.scheduler.ExecutorServiceScheduler not found. Check Reactor version.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter(
|
||||
CompositeMessageConverterFactory compositeMessageConverterFactory) {
|
||||
|
||||
@@ -0,0 +1,346 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import reactor.core.Cancellation;
|
||||
import reactor.core.Exceptions;
|
||||
import reactor.core.publisher.Operators;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.concurrent.OpenHashSet;
|
||||
|
||||
/**
|
||||
* Dynamically creates ExecutorService-based Workers and caches the thread pools, reusing
|
||||
* them once the Workers have been shut down.
|
||||
* <p>
|
||||
* The maximum number of created thread pools is unbounded.
|
||||
* <p>
|
||||
* The default time-to-live for unused thread pools is 60 seconds, use the
|
||||
* appropriate constructor to set a different value.
|
||||
* <p>
|
||||
* This scheduler is not restartable (may be later).
|
||||
*
|
||||
* @author Stephane Maldini
|
||||
*/
|
||||
final class ElasticScheduler implements Scheduler {
|
||||
static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
static final ThreadFactory EVICTOR_FACTORY = r -> {
|
||||
Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet());
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
};
|
||||
|
||||
final ThreadFactory factory;
|
||||
|
||||
final int ttlSeconds;
|
||||
|
||||
static final int DEFAULT_TTL_SECONDS = 60;
|
||||
|
||||
final Queue<ExecutorServiceExpiry> cache;
|
||||
|
||||
final Queue<ExecutorService> all;
|
||||
|
||||
final ScheduledExecutorService evictor;
|
||||
|
||||
static final ExecutorService SHUTDOWN;
|
||||
static {
|
||||
SHUTDOWN = Executors.newSingleThreadExecutor();
|
||||
SHUTDOWN.shutdownNow();
|
||||
}
|
||||
|
||||
volatile boolean shutdown;
|
||||
|
||||
public ElasticScheduler(ThreadFactory factory, int ttlSeconds) {
|
||||
this.ttlSeconds = ttlSeconds;
|
||||
this.factory = factory;
|
||||
this.cache = new ConcurrentLinkedQueue<>();
|
||||
this.all = new ConcurrentLinkedQueue<>();
|
||||
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
|
||||
this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
throw new UnsupportedOperationException("Restarting not supported yet");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
|
||||
evictor.shutdownNow();
|
||||
|
||||
cache.clear();
|
||||
|
||||
ExecutorService exec;
|
||||
|
||||
while ((exec = all.poll()) != null) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorService pick() {
|
||||
if (shutdown) {
|
||||
return SHUTDOWN;
|
||||
}
|
||||
ExecutorService result;
|
||||
ExecutorServiceExpiry e = cache.poll();
|
||||
if (e != null) {
|
||||
return e.executor;
|
||||
}
|
||||
|
||||
result = Executors.newSingleThreadExecutor(factory);
|
||||
all.offer(result);
|
||||
if (shutdown) {
|
||||
all.remove(result);
|
||||
return SHUTDOWN;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
ExecutorService exec = pick();
|
||||
|
||||
Runnable wrapper = () -> {
|
||||
try {
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable ex) {
|
||||
Exceptions.throwIfFatal(ex);
|
||||
Operators.onErrorDropped(ex);
|
||||
}
|
||||
} finally {
|
||||
release(exec);
|
||||
}
|
||||
};
|
||||
Future<?> f;
|
||||
|
||||
try {
|
||||
f = exec.submit(wrapper);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
return REJECTED;
|
||||
}
|
||||
return () -> f.cancel(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Worker createWorker() {
|
||||
ExecutorService exec = pick();
|
||||
return new CachedWorker(exec, this);
|
||||
}
|
||||
|
||||
void release(ExecutorService exec) {
|
||||
if (exec != SHUTDOWN && !shutdown) {
|
||||
ExecutorServiceExpiry e = new ExecutorServiceExpiry(exec, System.currentTimeMillis() + ttlSeconds * 1000L);
|
||||
cache.offer(e);
|
||||
if (shutdown) {
|
||||
if (cache.remove(e)) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void eviction() {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
List<ExecutorServiceExpiry> list = new ArrayList<>(cache);
|
||||
for (ExecutorServiceExpiry e : list) {
|
||||
if (e.expireMillis < now) {
|
||||
if (cache.remove(e)) {
|
||||
e.executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static final class ExecutorServiceExpiry {
|
||||
final ExecutorService executor;
|
||||
final long expireMillis;
|
||||
|
||||
public ExecutorServiceExpiry(ExecutorService executor, long expireMillis) {
|
||||
this.executor = executor;
|
||||
this.expireMillis = expireMillis;
|
||||
}
|
||||
}
|
||||
|
||||
static final class CachedWorker implements Worker {
|
||||
|
||||
final ExecutorService executor;
|
||||
|
||||
final ElasticScheduler parent;
|
||||
|
||||
volatile boolean shutdown;
|
||||
|
||||
OpenHashSet<CachedTask> tasks;
|
||||
|
||||
public CachedWorker(ExecutorService executor, ElasticScheduler parent) {
|
||||
this.executor = executor;
|
||||
this.parent = parent;
|
||||
this.tasks = new OpenHashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
CachedTask ct = new CachedTask(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
tasks.add(ct);
|
||||
}
|
||||
|
||||
Future<?> f;
|
||||
try {
|
||||
f = executor.submit(ct);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
ct.setFuture(f);
|
||||
|
||||
return ct;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
OpenHashSet<CachedTask> set;
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
set = tasks;
|
||||
tasks = null;
|
||||
}
|
||||
|
||||
if (!set.isEmpty()) {
|
||||
Object[] keys = set.keys();
|
||||
for (Object o : keys) {
|
||||
if (o != null) {
|
||||
((CachedTask)o).cancelFuture();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
parent.release(executor);
|
||||
}
|
||||
|
||||
void remove(CachedTask task) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
tasks.remove(task);
|
||||
}
|
||||
}
|
||||
|
||||
static final class CachedTask
|
||||
extends AtomicReference<Future<?>>
|
||||
implements Runnable, Cancellation {
|
||||
/** */
|
||||
private static final long serialVersionUID = 6799295393954430738L;
|
||||
|
||||
final Runnable run;
|
||||
|
||||
final CachedWorker parent;
|
||||
|
||||
volatile boolean cancelled;
|
||||
|
||||
static final FutureTask<Object> CANCELLED = new FutureTask<>(() -> { }, null);
|
||||
|
||||
static final FutureTask<Object> FINISHED = new FutureTask<>(() -> { }, null);
|
||||
|
||||
public CachedTask(Runnable run, CachedWorker parent) {
|
||||
this.run = run;
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (!parent.shutdown && !cancelled) {
|
||||
run.run();
|
||||
}
|
||||
} catch (Throwable ex) {
|
||||
Exceptions.throwIfFatal(ex);
|
||||
Operators.onErrorDropped(ex);
|
||||
} finally {
|
||||
lazySet(FINISHED);
|
||||
parent.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
cancelled = true;
|
||||
cancelFuture();
|
||||
}
|
||||
|
||||
void setFuture(Future<?> f) {
|
||||
if (!compareAndSet(null, f)) {
|
||||
if (get() != FINISHED) {
|
||||
f.cancel(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cancelFuture() {
|
||||
Future<?> f = get();
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
f = getAndSet(CANCELLED);
|
||||
if (f != null && f != CANCELLED && f != FINISHED) {
|
||||
f.cancel(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
|
||||
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
import reactor.core.scheduler.TimedScheduler;
|
||||
|
||||
/**
|
||||
* {@link reactor.core.scheduler.Schedulers.Factory} implementation that pulls in
|
||||
* {@link reactor.core.scheduler.Scheduler} implementations with fixes for
|
||||
* <a href="https://github.com/reactor/reactor-core/issues/159"/>.
|
||||
*
|
||||
* This is a temporary solution, until a Reactor release is available.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class NoInterruptOnCancelSchedulerFactory implements Schedulers.Factory {
|
||||
|
||||
@Override
|
||||
public Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) {
|
||||
return new ElasticScheduler(threadFactory, ttlSeconds);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) {
|
||||
return new ParallelScheduler(parallelism, threadFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scheduler newSingle(ThreadFactory threadFactory) {
|
||||
return new SingleScheduler(threadFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimedScheduler newTimer(ThreadFactory threadFactory) {
|
||||
return new SingleTimedScheduler(threadFactory);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,323 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
import reactor.core.Cancellation;
|
||||
import reactor.core.Exceptions;
|
||||
import reactor.core.publisher.Operators;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.concurrent.OpenHashSet;
|
||||
|
||||
/**
|
||||
* Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers
|
||||
* and is suited for parallel work.
|
||||
* @author Stephane Maldini
|
||||
*/
|
||||
final class ParallelScheduler implements Scheduler {
|
||||
|
||||
static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
final int n;
|
||||
|
||||
final ThreadFactory factory;
|
||||
|
||||
volatile ExecutorService[] executors;
|
||||
static final AtomicReferenceFieldUpdater<ParallelScheduler, ExecutorService[]> EXECUTORS =
|
||||
AtomicReferenceFieldUpdater.newUpdater(ParallelScheduler.class, ExecutorService[].class, "executors");
|
||||
|
||||
static final ExecutorService[] SHUTDOWN = new ExecutorService[0];
|
||||
|
||||
static final ExecutorService TERMINATED;
|
||||
static {
|
||||
TERMINATED = Executors.newSingleThreadExecutor();
|
||||
TERMINATED.shutdownNow();
|
||||
}
|
||||
|
||||
int roundRobin;
|
||||
|
||||
ParallelScheduler(int n, ThreadFactory factory) {
|
||||
if (n <= 0) {
|
||||
throw new IllegalArgumentException("n > 0 required but it was " + n);
|
||||
}
|
||||
this.n = n;
|
||||
this.factory = factory;
|
||||
init(n);
|
||||
}
|
||||
|
||||
void init(int n) {
|
||||
ExecutorService[] a = new ExecutorService[n];
|
||||
for (int i = 0; i < n; i++) {
|
||||
a[i] = Executors.newSingleThreadExecutor(factory);
|
||||
}
|
||||
EXECUTORS.lazySet(this, a);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
ExecutorService[] b = null;
|
||||
for (;;) {
|
||||
ExecutorService[] a = executors;
|
||||
if (a != SHUTDOWN) {
|
||||
if (b != null) {
|
||||
for (ExecutorService exec : b) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (b == null) {
|
||||
b = new ExecutorService[n];
|
||||
for (int i = 0; i < n; i++) {
|
||||
b[i] = Executors.newSingleThreadExecutor(factory);
|
||||
}
|
||||
}
|
||||
|
||||
if (EXECUTORS.compareAndSet(this, a, b)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
ExecutorService[] a = executors;
|
||||
if (a != SHUTDOWN) {
|
||||
a = EXECUTORS.getAndSet(this, SHUTDOWN);
|
||||
if (a != SHUTDOWN) {
|
||||
for (ExecutorService exec : a) {
|
||||
exec.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ExecutorService pick() {
|
||||
ExecutorService[] a = executors;
|
||||
if (a != SHUTDOWN) {
|
||||
// ignoring the race condition here, its already random who gets which executor
|
||||
int idx = roundRobin;
|
||||
if (idx == n) {
|
||||
idx = 0;
|
||||
roundRobin = 0;
|
||||
} else {
|
||||
roundRobin = idx + 1;
|
||||
}
|
||||
return a[idx];
|
||||
}
|
||||
return TERMINATED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
ExecutorService exec = pick();
|
||||
Future<?> f = exec.submit(task);
|
||||
return () -> f.cancel(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Worker createWorker() {
|
||||
return new ParallelWorker(pick());
|
||||
}
|
||||
|
||||
static final class ParallelWorker implements Worker {
|
||||
final ExecutorService exec;
|
||||
|
||||
OpenHashSet<ParallelWorkerTask> tasks;
|
||||
|
||||
volatile boolean shutdown;
|
||||
|
||||
public ParallelWorker(ExecutorService exec) {
|
||||
this.exec = exec;
|
||||
this.tasks = new OpenHashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
ParallelWorkerTask pw = new ParallelWorkerTask(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
tasks.add(pw);
|
||||
}
|
||||
|
||||
Future<?> f;
|
||||
try {
|
||||
f = exec.submit(pw);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
if (shutdown) {
|
||||
f.cancel(true);
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
pw.setFuture(f);
|
||||
|
||||
return pw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
OpenHashSet<ParallelWorkerTask> set;
|
||||
synchronized (this) {
|
||||
set = tasks;
|
||||
tasks = null;
|
||||
}
|
||||
|
||||
if (set != null) {
|
||||
Object[] a = set.keys();
|
||||
for (Object o : a) {
|
||||
if (o != null) {
|
||||
((ParallelWorkerTask)o).cancelFuture();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void remove(ParallelWorkerTask task) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
tasks.remove(task);
|
||||
}
|
||||
}
|
||||
|
||||
int pendingTasks() {
|
||||
if (shutdown) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
OpenHashSet<?> set = tasks;
|
||||
if (set != null) {
|
||||
return set.size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static final class ParallelWorkerTask implements Runnable, Cancellation {
|
||||
final Runnable run;
|
||||
|
||||
final ParallelWorker parent;
|
||||
|
||||
volatile boolean cancelled;
|
||||
|
||||
volatile Future<?> future;
|
||||
@SuppressWarnings("rawtypes")
|
||||
static final AtomicReferenceFieldUpdater<ParallelWorkerTask, Future> FUTURE =
|
||||
AtomicReferenceFieldUpdater.newUpdater(ParallelWorkerTask.class, Future.class, "future");
|
||||
|
||||
static final Future<Object> FINISHED = CompletableFuture.completedFuture(null);
|
||||
static final Future<Object> CANCELLED = CompletableFuture.completedFuture(null);
|
||||
|
||||
public ParallelWorkerTask(Runnable run, ParallelWorker parent) {
|
||||
this.run = run;
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (cancelled || parent.shutdown) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
try {
|
||||
run.run();
|
||||
} catch (Throwable ex) {
|
||||
Exceptions.throwIfFatal(ex);
|
||||
Operators.onErrorDropped(ex);
|
||||
}
|
||||
} finally {
|
||||
for (;;) {
|
||||
Future<?> f = future;
|
||||
if (f == CANCELLED) {
|
||||
break;
|
||||
}
|
||||
if (FUTURE.compareAndSet(this, f, FINISHED)) {
|
||||
parent.remove(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
if (!cancelled) {
|
||||
cancelled = true;
|
||||
|
||||
Future<?> f = future;
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
f = FUTURE.getAndSet(this, CANCELLED);
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
if (f != null) {
|
||||
f.cancel(parent.shutdown);
|
||||
}
|
||||
|
||||
parent.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setFuture(Future<?> f) {
|
||||
if (future != null || !FUTURE.compareAndSet(this, null, f)) {
|
||||
if (future != FINISHED) {
|
||||
f.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cancelFuture() {
|
||||
Future<?> f = future;
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
f = FUTURE.getAndSet(this, CANCELLED);
|
||||
if (f != null && f != CANCELLED && f != FINISHED) {
|
||||
f.cancel(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,293 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
import reactor.core.Cancellation;
|
||||
import reactor.core.Exceptions;
|
||||
import reactor.core.publisher.Operators;
|
||||
import reactor.core.scheduler.Scheduler;
|
||||
import reactor.util.concurrent.OpenHashSet;
|
||||
|
||||
/**
|
||||
* Scheduler that works with a single-threaded ExecutorService and is suited for
|
||||
* same-thread work (like an event dispatch thread).
|
||||
*/
|
||||
final class SingleScheduler implements Scheduler {
|
||||
|
||||
static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
final ThreadFactory factory;
|
||||
|
||||
volatile ExecutorService executor;
|
||||
static final AtomicReferenceFieldUpdater<SingleScheduler, ExecutorService> EXECUTORS =
|
||||
AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class, ExecutorService.class, "executor");
|
||||
|
||||
static final ExecutorService TERMINATED;
|
||||
static {
|
||||
TERMINATED = Executors.newSingleThreadExecutor();
|
||||
TERMINATED.shutdownNow();
|
||||
}
|
||||
|
||||
public SingleScheduler(ThreadFactory factory) {
|
||||
this.factory = factory;
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
EXECUTORS.lazySet(this, Executors.newSingleThreadExecutor(factory));
|
||||
}
|
||||
|
||||
public boolean isStarted() {
|
||||
return executor != TERMINATED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
ExecutorService b = null;
|
||||
for (;;) {
|
||||
ExecutorService a = executor;
|
||||
if (a != TERMINATED) {
|
||||
if (b != null) {
|
||||
b.shutdownNow();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (b == null) {
|
||||
b = Executors.newSingleThreadExecutor(factory);
|
||||
}
|
||||
|
||||
if (EXECUTORS.compareAndSet(this, a, b)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
ExecutorService a = executor;
|
||||
if (a != TERMINATED) {
|
||||
a = EXECUTORS.getAndSet(this, TERMINATED);
|
||||
if (a != TERMINATED) {
|
||||
a.shutdownNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
try {
|
||||
Future<?> f = executor.submit(task);
|
||||
return () -> f.cancel(false);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
return REJECTED;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Worker createWorker() {
|
||||
return new SingleWorker(executor);
|
||||
}
|
||||
|
||||
static final class SingleWorker implements Worker {
|
||||
final ExecutorService exec;
|
||||
|
||||
OpenHashSet<SingleWorkerTask> tasks;
|
||||
|
||||
volatile boolean shutdown;
|
||||
|
||||
public SingleWorker(ExecutorService exec) {
|
||||
this.exec = exec;
|
||||
this.tasks = new OpenHashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
SingleWorkerTask pw = new SingleWorkerTask(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return REJECTED;
|
||||
}
|
||||
tasks.add(pw);
|
||||
}
|
||||
|
||||
Future<?> f;
|
||||
try {
|
||||
f = exec.submit(pw);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
if (shutdown) {
|
||||
f.cancel(true);
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
pw.setFuture(f);
|
||||
|
||||
return pw;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
shutdown = true;
|
||||
OpenHashSet<SingleWorkerTask> set;
|
||||
synchronized (this) {
|
||||
set = tasks;
|
||||
tasks = null;
|
||||
}
|
||||
|
||||
if (set != null && !set.isEmpty()) {
|
||||
Object[] a = set.keys();
|
||||
for (Object o : a) {
|
||||
if (o != null) {
|
||||
((SingleWorkerTask)o).cancelFuture();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void remove(SingleWorkerTask task) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
if (shutdown) {
|
||||
return;
|
||||
}
|
||||
tasks.remove(task);
|
||||
}
|
||||
}
|
||||
|
||||
int pendingTasks() {
|
||||
if (shutdown) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
OpenHashSet<?> set = tasks;
|
||||
if (set != null) {
|
||||
return set.size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static final class SingleWorkerTask implements Runnable, Cancellation {
|
||||
final Runnable run;
|
||||
|
||||
final SingleWorker parent;
|
||||
|
||||
volatile boolean cancelled;
|
||||
|
||||
volatile Future<?> future;
|
||||
@SuppressWarnings("rawtypes")
|
||||
static final AtomicReferenceFieldUpdater<SingleWorkerTask, Future> FUTURE =
|
||||
AtomicReferenceFieldUpdater.newUpdater(SingleWorkerTask.class, Future.class, "future");
|
||||
|
||||
static final Future<Object> FINISHED = CompletableFuture.completedFuture(null);
|
||||
static final Future<Object> CANCELLED = CompletableFuture.completedFuture(null);
|
||||
|
||||
public SingleWorkerTask(Runnable run, SingleWorker parent) {
|
||||
this.run = run;
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (cancelled || parent.shutdown) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
try {
|
||||
run.run();
|
||||
} catch (Throwable ex) {
|
||||
Exceptions.throwIfFatal(ex);
|
||||
Operators.onErrorDropped(ex);
|
||||
}
|
||||
} finally {
|
||||
for (;;) {
|
||||
Future<?> f = future;
|
||||
if (f == CANCELLED) {
|
||||
break;
|
||||
}
|
||||
if (FUTURE.compareAndSet(this, f, FINISHED)) {
|
||||
parent.remove(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
if (!cancelled) {
|
||||
cancelled = true;
|
||||
|
||||
Future<?> f = future;
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
f = FUTURE.getAndSet(this, CANCELLED);
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
if (f != null) {
|
||||
f.cancel(false);
|
||||
}
|
||||
|
||||
parent.remove(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setFuture(Future<?> f) {
|
||||
if (future != null || !FUTURE.compareAndSet(this, null, f)) {
|
||||
if (future != FINISHED) {
|
||||
f.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void cancelFuture() {
|
||||
Future<?> f = future;
|
||||
if (f != CANCELLED && f != FINISHED) {
|
||||
f = FUTURE.getAndSet(this, CANCELLED);
|
||||
if (f != null && f != CANCELLED && f != FINISHED) {
|
||||
f.cancel(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,464 @@
|
||||
/*
|
||||
* Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.springframework.cloud.stream.reactive.reactor.core.scheduler;
|
||||
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
|
||||
|
||||
import reactor.core.Cancellation;
|
||||
import reactor.core.publisher.Operators;
|
||||
import reactor.core.scheduler.TimedScheduler;
|
||||
import reactor.util.concurrent.OpenHashSet;
|
||||
|
||||
/**
|
||||
* A TimedScheduler with an embedded, single-threaded ScheduledExecutorService,
|
||||
* shared among all workers.
|
||||
*/
|
||||
final class SingleTimedScheduler implements TimedScheduler {
|
||||
|
||||
static final AtomicLong COUNTER = new AtomicLong();
|
||||
|
||||
final ScheduledThreadPoolExecutor executor;
|
||||
|
||||
/**
|
||||
* Constructs a new SingleTimedScheduler with the given thread factory.
|
||||
* @param threadFactory the thread factory to use
|
||||
*/
|
||||
SingleTimedScheduler(ThreadFactory threadFactory) {
|
||||
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, threadFactory);
|
||||
e.setRemoveOnCancelPolicy(true);
|
||||
executor = e;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
try {
|
||||
Future<?> f = executor.submit(task);
|
||||
return () -> f.cancel(false);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
return REJECTED;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task, long delay, TimeUnit unit) {
|
||||
try {
|
||||
Future<?> f = executor.schedule(task, delay, unit);
|
||||
return () -> f.cancel(false);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
return REJECTED;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
|
||||
try {
|
||||
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
|
||||
return () -> f.cancel(false);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
return REJECTED;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
throw new UnsupportedOperationException("Not supported, yet.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimedWorker createWorker() {
|
||||
return new SingleTimedSchedulerWorker(executor);
|
||||
}
|
||||
|
||||
static final class SingleTimedSchedulerWorker implements TimedWorker {
|
||||
final ScheduledThreadPoolExecutor executor;
|
||||
|
||||
OpenHashSet<CancelFuture> tasks;
|
||||
|
||||
volatile boolean terminated;
|
||||
|
||||
public SingleTimedSchedulerWorker(ScheduledThreadPoolExecutor executor) {
|
||||
this.executor = executor;
|
||||
this.tasks = new OpenHashSet<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
tasks.add(sr);
|
||||
}
|
||||
|
||||
try {
|
||||
Future<?> f = executor.submit(sr);
|
||||
sr.set(f);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
sr.dispose();
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
return sr;
|
||||
}
|
||||
|
||||
void delete(CancelFuture r) {
|
||||
synchronized (this) {
|
||||
if (!terminated) {
|
||||
tasks.remove(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedule(Runnable task, long delay, TimeUnit unit) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
tasks.add(sr);
|
||||
}
|
||||
|
||||
try {
|
||||
Future<?> f = executor.schedule(sr, delay, unit);
|
||||
sr.set(f);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
sr.dispose();
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
return sr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
TimedPeriodicScheduledRunnable sr = new TimedPeriodicScheduledRunnable(task, this);
|
||||
|
||||
synchronized (this) {
|
||||
if (terminated) {
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
tasks.add(sr);
|
||||
}
|
||||
|
||||
try {
|
||||
Future<?> f = executor.scheduleAtFixedRate(sr, initialDelay, period, unit);
|
||||
sr.set(f);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
sr.dispose();
|
||||
return REJECTED;
|
||||
}
|
||||
|
||||
return sr;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
if (terminated) {
|
||||
return;
|
||||
}
|
||||
terminated = true;
|
||||
|
||||
OpenHashSet<CancelFuture> set;
|
||||
|
||||
synchronized (this) {
|
||||
set = tasks;
|
||||
if (set == null) {
|
||||
return;
|
||||
}
|
||||
tasks = null;
|
||||
}
|
||||
|
||||
if (!set.isEmpty()) {
|
||||
Object[] keys = set.keys();
|
||||
for (Object c : keys) {
|
||||
if (c != null) {
|
||||
((CancelFuture)c).cancelFuture();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
interface CancelFuture {
|
||||
void cancelFuture();
|
||||
}
|
||||
|
||||
static final class TimedScheduledRunnable
|
||||
extends AtomicReference<Future<?>> implements Runnable, Cancellation, CancelFuture {
|
||||
/** */
|
||||
private static final long serialVersionUID = 2284024836904862408L;
|
||||
|
||||
final Runnable task;
|
||||
|
||||
final SingleTimedSchedulerWorker parent;
|
||||
|
||||
volatile Thread current;
|
||||
static final AtomicReferenceFieldUpdater<TimedScheduledRunnable, Thread> CURRENT =
|
||||
AtomicReferenceFieldUpdater.newUpdater(TimedScheduledRunnable.class, Thread.class, "current");
|
||||
|
||||
static final Runnable EMPTY = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
static final Future<?> CANCELLED_FUTURE = new FutureTask<>(EMPTY, null);
|
||||
|
||||
static final Future<?> FINISHED = new FutureTask<>(EMPTY, null);
|
||||
|
||||
public TimedScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) {
|
||||
this.task = task;
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
CURRENT.lazySet(this, Thread.currentThread());
|
||||
try {
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable e) {
|
||||
Operators.onErrorDropped(e);
|
||||
}
|
||||
} finally {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == CANCELLED_FUTURE) {
|
||||
break;
|
||||
}
|
||||
if (compareAndSet(a, FINISHED)) {
|
||||
if (a != null) {
|
||||
doCancel(a);
|
||||
}
|
||||
parent.delete(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
CURRENT.lazySet(this, null);
|
||||
}
|
||||
}
|
||||
|
||||
void doCancel(Future<?> a) {
|
||||
a.cancel(Thread.currentThread() != current);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelFuture() {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(a, CANCELLED_FUTURE)) {
|
||||
if (a != null) {
|
||||
doCancel(a);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(a, CANCELLED_FUTURE)) {
|
||||
if (a != null) {
|
||||
doCancel(a);
|
||||
}
|
||||
parent.delete(this);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void setFuture(Future<?> f) {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (a == CANCELLED_FUTURE) {
|
||||
doCancel(a);
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(null, f)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TimedScheduledRunnable[cancelled=" + (get() == CANCELLED_FUTURE) +
|
||||
", task=" + task +
|
||||
"]";
|
||||
}
|
||||
}
|
||||
|
||||
static final class TimedPeriodicScheduledRunnable
|
||||
extends AtomicReference<Future<?>>
|
||||
implements Runnable, Cancellation, CancelFuture {
|
||||
/** */
|
||||
private static final long serialVersionUID = 2284024836904862408L;
|
||||
|
||||
final Runnable task;
|
||||
|
||||
final SingleTimedSchedulerWorker parent;
|
||||
|
||||
volatile Thread current;
|
||||
static final AtomicReferenceFieldUpdater<TimedPeriodicScheduledRunnable, Thread> CURRENT =
|
||||
AtomicReferenceFieldUpdater.newUpdater(TimedPeriodicScheduledRunnable.class, Thread.class, "current");
|
||||
|
||||
static final Runnable EMPTY = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
static final Future<?> CANCELLED_FUTURE = new FutureTask<>(EMPTY, null);
|
||||
|
||||
static final Future<?> FINISHED = new FutureTask<>(EMPTY, null);
|
||||
|
||||
public TimedPeriodicScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) {
|
||||
this.task = task;
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
CURRENT.lazySet(this, Thread.currentThread());
|
||||
try {
|
||||
try {
|
||||
task.run();
|
||||
} catch (Throwable ex) {
|
||||
Operators.onErrorDropped(ex);
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == CANCELLED_FUTURE) {
|
||||
break;
|
||||
}
|
||||
if (compareAndSet(a, FINISHED)) {
|
||||
parent.delete(this);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
CURRENT.lazySet(this, null);
|
||||
}
|
||||
}
|
||||
|
||||
void doCancel(Future<?> a) {
|
||||
a.cancel(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancelFuture() {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(a, CANCELLED_FUTURE)) {
|
||||
if (a != null) {
|
||||
doCancel(a);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dispose() {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(a, CANCELLED_FUTURE)) {
|
||||
if (a != null) {
|
||||
doCancel(a);
|
||||
}
|
||||
parent.delete(this);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void setFuture(Future<?> f) {
|
||||
for (;;) {
|
||||
Future<?> a = get();
|
||||
if (a == FINISHED) {
|
||||
return;
|
||||
}
|
||||
if (a == CANCELLED_FUTURE) {
|
||||
doCancel(a);
|
||||
return;
|
||||
}
|
||||
if (compareAndSet(null, f)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TimedPeriodicScheduledRunnable[cancelled=" + get() + ", task=" + task + "]";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
/*
|
||||
* Copyright 2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.stream.reactive;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
import org.springframework.cloud.stream.messaging.Sink;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Test validating that a fix for <a href="https://github.com/reactor/reactor-core/issues/159"/>
|
||||
* is present.
|
||||
*
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
public class StreamListenerInterruptionTests {
|
||||
|
||||
@Test
|
||||
public void testSubscribersNotInterrupted() throws Exception {
|
||||
ConfigurableApplicationContext context = SpringApplication.run(TestTimeWindows.class,
|
||||
"--server.port=0");
|
||||
Sink sink = context.getBean(Sink.class);
|
||||
TestTimeWindows testTimeWindows = context.getBean(TestTimeWindows.class);
|
||||
sink.input().send(MessageBuilder.withPayload("hello1").build());
|
||||
sink.input().send(MessageBuilder.withPayload("hello2").build());
|
||||
sink.input().send(MessageBuilder.withPayload("hello3").build());
|
||||
assertThat(testTimeWindows.latch.await(5, TimeUnit.SECONDS)).isTrue();
|
||||
assertThat(testTimeWindows.interruptionState).isNotNull();
|
||||
assertThat(testTimeWindows.interruptionState).isFalse();
|
||||
context.close();
|
||||
}
|
||||
|
||||
@EnableBinding(Sink.class)
|
||||
@EnableAutoConfiguration
|
||||
public static class TestTimeWindows {
|
||||
|
||||
public CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
public Boolean interruptionState;
|
||||
|
||||
@StreamListener
|
||||
public void receive(@Input(Sink.INPUT) Flux<String> input) {
|
||||
input.window(Duration.ofMillis(500), Duration.ofMillis(100))
|
||||
.flatMap(w -> w.reduce("", (x, y) -> x + y))
|
||||
.subscribe(x -> {
|
||||
interruptionState = Thread.currentThread().isInterrupted();
|
||||
latch.countDown();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
<?xml version="1.0"?>
|
||||
<!DOCTYPE suppressions PUBLIC
|
||||
"-//Puppy Crawl//DTD Suppressions 1.1//EN"
|
||||
"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
|
||||
<suppressions>
|
||||
<suppress files="[\\/]src[\\/]main[\\/]java[\\/]org[\\/]springframework[\\/]cloud[\\/]stream[\\/]reactive[\\/]reactor[\\/]core[\\/]scheduler[\\/]" checks=".*" />
|
||||
</suppressions>
|
||||
Reference in New Issue
Block a user