From 1b62be0217be7a5af5864d6947d60864cdb55da7 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Thu, 25 Aug 2016 12:10:58 -0400 Subject: [PATCH] Override scheduler implementations for Reactor 3.0.0 Workaround for https://github.com/reactor/reactor-core/issues/159 --- pom.xml | 3 +- spring-cloud-stream-reactive/.jdk8 | 0 .../ReactiveSupportAutoConfiguration.java | 35 ++ .../core/scheduler/ElasticScheduler.java | 346 +++++++++++++ .../NoInterruptOnCancelSchedulerFactory.java | 55 +++ .../core/scheduler/ParallelScheduler.java | 323 ++++++++++++ .../core/scheduler/SingleScheduler.java | 293 +++++++++++ .../core/scheduler/SingleTimedScheduler.java | 464 ++++++++++++++++++ .../StreamListenerInterruptionTests.java | 78 +++ .../resources/checkstyle-suppressions.xml | 7 + 10 files changed, 1603 insertions(+), 1 deletion(-) create mode 100644 spring-cloud-stream-reactive/.jdk8 create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java create mode 100644 spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java create mode 100644 spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerInterruptionTests.java create mode 100644 spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml diff --git a/pom.xml b/pom.xml index 6702aee5b..a6299940d 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ 1.1.5 1.0.0.RELEASE 1.0.0.RELEASE - 3.0.0.BUILD-SNAPSHOT + 3.0.0.RELEASE @@ -162,6 +162,7 @@ checkstyle.xml checkstyle-header.txt + checkstyle-suppressions.xml UTF-8 true true diff --git a/spring-cloud-stream-reactive/.jdk8 b/spring-cloud-stream-reactive/.jdk8 new file mode 100644 index 000000000..e69de29bb diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java index c1ff3945f..4dc317058 100644 --- a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/ReactiveSupportAutoConfiguration.java @@ -16,10 +16,18 @@ package org.springframework.cloud.stream.reactive; +import java.util.concurrent.ExecutorService; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.scheduler.Schedulers; + import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory; +import org.springframework.cloud.stream.reactive.reactor.core.scheduler.NoInterruptOnCancelSchedulerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.util.ClassUtils; /** * @author Marius Bogoevici @@ -27,6 +35,33 @@ import org.springframework.context.annotation.Configuration; @Configuration public class ReactiveSupportAutoConfiguration { + private static Log log = LogFactory.getLog(ReactiveSupportAutoConfiguration.class); + + static { + try { + // Override Schedulers.Factory for Reactor 3.0.0 to work around + // https://github.com/reactor/reactor-core/issues/159 + // To be removed once Reactor 3.0.1+ is used + Class executorServiceSchedulerClass = ClassUtils.forName("reactor.core.scheduler.ExecutorServiceScheduler", null); + try { + // simple check that the construction version with the cancellation option is not on the classpath + executorServiceSchedulerClass.getConstructor(ExecutorService.class, Boolean.class); + } + catch (NoSuchMethodException e) { + if (log.isDebugEnabled()) { + log.debug("Overriding Schedulers for Reactor"); + } + Schedulers.setFactory(new NoInterruptOnCancelSchedulerFactory()); + } + } + catch (ClassNotFoundException e) { + // Ignore if absent - means that we're on a different Reactor version than expected + if (log.isInfoEnabled()) { + log.info("Class reactor.core.scheduler.ExecutorServiceScheduler not found. Check Reactor version."); + } + } + } + @Bean public MessageChannelToInputFluxParameterAdapter messageChannelToInputFluxArgumentAdapter( CompositeMessageConverterFactory compositeMessageConverterFactory) { diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java new file mode 100644 index 000000000..8101de45b --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ElasticScheduler.java @@ -0,0 +1,346 @@ +/* + * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.stream.reactive.reactor.core.scheduler; + +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import reactor.core.Cancellation; +import reactor.core.Exceptions; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.Scheduler; +import reactor.util.concurrent.OpenHashSet; + +/** + * Dynamically creates ExecutorService-based Workers and caches the thread pools, reusing + * them once the Workers have been shut down. + *

+ * The maximum number of created thread pools is unbounded. + *

+ * The default time-to-live for unused thread pools is 60 seconds, use the + * appropriate constructor to set a different value. + *

+ * This scheduler is not restartable (may be later). + * + * @author Stephane Maldini + */ +final class ElasticScheduler implements Scheduler { + static final AtomicLong COUNTER = new AtomicLong(); + + static final ThreadFactory EVICTOR_FACTORY = r -> { + Thread t = new Thread(r, "elastic-evictor-" + COUNTER.incrementAndGet()); + t.setDaemon(true); + return t; + }; + + final ThreadFactory factory; + + final int ttlSeconds; + + static final int DEFAULT_TTL_SECONDS = 60; + + final Queue cache; + + final Queue all; + + final ScheduledExecutorService evictor; + + static final ExecutorService SHUTDOWN; + static { + SHUTDOWN = Executors.newSingleThreadExecutor(); + SHUTDOWN.shutdownNow(); + } + + volatile boolean shutdown; + + public ElasticScheduler(ThreadFactory factory, int ttlSeconds) { + this.ttlSeconds = ttlSeconds; + this.factory = factory; + this.cache = new ConcurrentLinkedQueue<>(); + this.all = new ConcurrentLinkedQueue<>(); + this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY); + this.evictor.scheduleAtFixedRate(this::eviction, ttlSeconds, ttlSeconds, TimeUnit.SECONDS); + } + + @Override + public void start() { + throw new UnsupportedOperationException("Restarting not supported yet"); + } + + @Override + public void shutdown() { + if (shutdown) { + return; + } + shutdown = true; + + evictor.shutdownNow(); + + cache.clear(); + + ExecutorService exec; + + while ((exec = all.poll()) != null) { + exec.shutdownNow(); + } + } + + ExecutorService pick() { + if (shutdown) { + return SHUTDOWN; + } + ExecutorService result; + ExecutorServiceExpiry e = cache.poll(); + if (e != null) { + return e.executor; + } + + result = Executors.newSingleThreadExecutor(factory); + all.offer(result); + if (shutdown) { + all.remove(result); + return SHUTDOWN; + } + return result; + } + + @Override + public Cancellation schedule(Runnable task) { + ExecutorService exec = pick(); + + Runnable wrapper = () -> { + try { + try { + task.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Operators.onErrorDropped(ex); + } + } finally { + release(exec); + } + }; + Future f; + + try { + f = exec.submit(wrapper); + } catch (RejectedExecutionException ex) { + Operators.onErrorDropped(ex); + return REJECTED; + } + return () -> f.cancel(true); + } + + @Override + public Worker createWorker() { + ExecutorService exec = pick(); + return new CachedWorker(exec, this); + } + + void release(ExecutorService exec) { + if (exec != SHUTDOWN && !shutdown) { + ExecutorServiceExpiry e = new ExecutorServiceExpiry(exec, System.currentTimeMillis() + ttlSeconds * 1000L); + cache.offer(e); + if (shutdown) { + if (cache.remove(e)) { + exec.shutdownNow(); + } + } + } + } + + void eviction() { + long now = System.currentTimeMillis(); + + List list = new ArrayList<>(cache); + for (ExecutorServiceExpiry e : list) { + if (e.expireMillis < now) { + if (cache.remove(e)) { + e.executor.shutdownNow(); + } + } + } + } + + static final class ExecutorServiceExpiry { + final ExecutorService executor; + final long expireMillis; + + public ExecutorServiceExpiry(ExecutorService executor, long expireMillis) { + this.executor = executor; + this.expireMillis = expireMillis; + } + } + + static final class CachedWorker implements Worker { + + final ExecutorService executor; + + final ElasticScheduler parent; + + volatile boolean shutdown; + + OpenHashSet tasks; + + public CachedWorker(ExecutorService executor, ElasticScheduler parent) { + this.executor = executor; + this.parent = parent; + this.tasks = new OpenHashSet<>(); + } + + @Override + public Cancellation schedule(Runnable task) { + if (shutdown) { + return REJECTED; + } + + CachedTask ct = new CachedTask(task, this); + + synchronized (this) { + if (shutdown) { + return REJECTED; + } + tasks.add(ct); + } + + Future f; + try { + f = executor.submit(ct); + } catch (RejectedExecutionException ex) { + Operators.onErrorDropped(ex); + return REJECTED; + } + + ct.setFuture(f); + + return ct; + } + + @Override + public void shutdown() { + if (shutdown) { + return; + } + + OpenHashSet set; + synchronized (this) { + if (shutdown) { + return; + } + shutdown = true; + set = tasks; + tasks = null; + } + + if (!set.isEmpty()) { + Object[] keys = set.keys(); + for (Object o : keys) { + if (o != null) { + ((CachedTask)o).cancelFuture(); + } + } + } + + parent.release(executor); + } + + void remove(CachedTask task) { + if (shutdown) { + return; + } + + synchronized (this) { + if (shutdown) { + return; + } + tasks.remove(task); + } + } + + static final class CachedTask + extends AtomicReference> + implements Runnable, Cancellation { + /** */ + private static final long serialVersionUID = 6799295393954430738L; + + final Runnable run; + + final CachedWorker parent; + + volatile boolean cancelled; + + static final FutureTask CANCELLED = new FutureTask<>(() -> { }, null); + + static final FutureTask FINISHED = new FutureTask<>(() -> { }, null); + + public CachedTask(Runnable run, CachedWorker parent) { + this.run = run; + this.parent = parent; + } + + @Override + public void run() { + try { + if (!parent.shutdown && !cancelled) { + run.run(); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Operators.onErrorDropped(ex); + } finally { + lazySet(FINISHED); + parent.remove(this); + } + } + + @Override + public void dispose() { + cancelled = true; + cancelFuture(); + } + + void setFuture(Future f) { + if (!compareAndSet(null, f)) { + if (get() != FINISHED) { + f.cancel(true); + } + } + } + + void cancelFuture() { + Future f = get(); + if (f != CANCELLED && f != FINISHED) { + f = getAndSet(CANCELLED); + if (f != null && f != CANCELLED && f != FINISHED) { + f.cancel(true); + } + } + } + } + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java new file mode 100644 index 000000000..9613ea3d5 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/NoInterruptOnCancelSchedulerFactory.java @@ -0,0 +1,55 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive.reactor.core.scheduler; + +import java.util.concurrent.ThreadFactory; + +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.core.scheduler.TimedScheduler; + +/** + * {@link reactor.core.scheduler.Schedulers.Factory} implementation that pulls in + * {@link reactor.core.scheduler.Scheduler} implementations with fixes for + * . + * + * This is a temporary solution, until a Reactor release is available. + * + * @author Marius Bogoevici + */ +public class NoInterruptOnCancelSchedulerFactory implements Schedulers.Factory { + + @Override + public Scheduler newElastic(int ttlSeconds, ThreadFactory threadFactory) { + return new ElasticScheduler(threadFactory, ttlSeconds); + } + + @Override + public Scheduler newParallel(int parallelism, ThreadFactory threadFactory) { + return new ParallelScheduler(parallelism, threadFactory); + } + + @Override + public Scheduler newSingle(ThreadFactory threadFactory) { + return new SingleScheduler(threadFactory); + } + + @Override + public TimedScheduler newTimer(ThreadFactory threadFactory) { + return new SingleTimedScheduler(threadFactory); + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java new file mode 100644 index 000000000..a062dfcff --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/ParallelScheduler.java @@ -0,0 +1,323 @@ +/* + * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.stream.reactive.reactor.core.scheduler; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import reactor.core.Cancellation; +import reactor.core.Exceptions; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.Scheduler; +import reactor.util.concurrent.OpenHashSet; + +/** + * Scheduler that hosts a fixed pool of single-threaded ExecutorService-based workers + * and is suited for parallel work. + * @author Stephane Maldini + */ +final class ParallelScheduler implements Scheduler { + + static final AtomicLong COUNTER = new AtomicLong(); + + final int n; + + final ThreadFactory factory; + + volatile ExecutorService[] executors; + static final AtomicReferenceFieldUpdater EXECUTORS = + AtomicReferenceFieldUpdater.newUpdater(ParallelScheduler.class, ExecutorService[].class, "executors"); + + static final ExecutorService[] SHUTDOWN = new ExecutorService[0]; + + static final ExecutorService TERMINATED; + static { + TERMINATED = Executors.newSingleThreadExecutor(); + TERMINATED.shutdownNow(); + } + + int roundRobin; + + ParallelScheduler(int n, ThreadFactory factory) { + if (n <= 0) { + throw new IllegalArgumentException("n > 0 required but it was " + n); + } + this.n = n; + this.factory = factory; + init(n); + } + + void init(int n) { + ExecutorService[] a = new ExecutorService[n]; + for (int i = 0; i < n; i++) { + a[i] = Executors.newSingleThreadExecutor(factory); + } + EXECUTORS.lazySet(this, a); + } + + @Override + public void start() { + ExecutorService[] b = null; + for (;;) { + ExecutorService[] a = executors; + if (a != SHUTDOWN) { + if (b != null) { + for (ExecutorService exec : b) { + exec.shutdownNow(); + } + } + return; + } + + if (b == null) { + b = new ExecutorService[n]; + for (int i = 0; i < n; i++) { + b[i] = Executors.newSingleThreadExecutor(factory); + } + } + + if (EXECUTORS.compareAndSet(this, a, b)) { + return; + } + } + } + + @Override + public void shutdown() { + ExecutorService[] a = executors; + if (a != SHUTDOWN) { + a = EXECUTORS.getAndSet(this, SHUTDOWN); + if (a != SHUTDOWN) { + for (ExecutorService exec : a) { + exec.shutdownNow(); + } + } + } + } + + ExecutorService pick() { + ExecutorService[] a = executors; + if (a != SHUTDOWN) { + // ignoring the race condition here, its already random who gets which executor + int idx = roundRobin; + if (idx == n) { + idx = 0; + roundRobin = 0; + } else { + roundRobin = idx + 1; + } + return a[idx]; + } + return TERMINATED; + } + + @Override + public Cancellation schedule(Runnable task) { + ExecutorService exec = pick(); + Future f = exec.submit(task); + return () -> f.cancel(false); + } + + @Override + public Worker createWorker() { + return new ParallelWorker(pick()); + } + + static final class ParallelWorker implements Worker { + final ExecutorService exec; + + OpenHashSet tasks; + + volatile boolean shutdown; + + public ParallelWorker(ExecutorService exec) { + this.exec = exec; + this.tasks = new OpenHashSet<>(); + } + + @Override + public Cancellation schedule(Runnable task) { + if (shutdown) { + return REJECTED; + } + + ParallelWorkerTask pw = new ParallelWorkerTask(task, this); + + synchronized (this) { + if (shutdown) { + return REJECTED; + } + tasks.add(pw); + } + + Future f; + try { + f = exec.submit(pw); + } catch (RejectedExecutionException ex) { + Operators.onErrorDropped(ex); + return REJECTED; + } + + if (shutdown) { + f.cancel(true); + return REJECTED; + } + + pw.setFuture(f); + + return pw; + } + + @Override + public void shutdown() { + if (shutdown) { + return; + } + shutdown = true; + OpenHashSet set; + synchronized (this) { + set = tasks; + tasks = null; + } + + if (set != null) { + Object[] a = set.keys(); + for (Object o : a) { + if (o != null) { + ((ParallelWorkerTask)o).cancelFuture(); + } + } + } + } + + void remove(ParallelWorkerTask task) { + if (shutdown) { + return; + } + + synchronized (this) { + if (shutdown) { + return; + } + tasks.remove(task); + } + } + + int pendingTasks() { + if (shutdown) { + return 0; + } + + synchronized (this) { + OpenHashSet set = tasks; + if (set != null) { + return set.size(); + } + return 0; + } + } + + static final class ParallelWorkerTask implements Runnable, Cancellation { + final Runnable run; + + final ParallelWorker parent; + + volatile boolean cancelled; + + volatile Future future; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater FUTURE = + AtomicReferenceFieldUpdater.newUpdater(ParallelWorkerTask.class, Future.class, "future"); + + static final Future FINISHED = CompletableFuture.completedFuture(null); + static final Future CANCELLED = CompletableFuture.completedFuture(null); + + public ParallelWorkerTask(Runnable run, ParallelWorker parent) { + this.run = run; + this.parent = parent; + } + + @Override + public void run() { + if (cancelled || parent.shutdown) { + return; + } + try { + try { + run.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Operators.onErrorDropped(ex); + } + } finally { + for (;;) { + Future f = future; + if (f == CANCELLED) { + break; + } + if (FUTURE.compareAndSet(this, f, FINISHED)) { + parent.remove(this); + break; + } + } + } + } + + @Override + public void dispose() { + if (!cancelled) { + cancelled = true; + + Future f = future; + if (f != CANCELLED && f != FINISHED) { + f = FUTURE.getAndSet(this, CANCELLED); + if (f != CANCELLED && f != FINISHED) { + if (f != null) { + f.cancel(parent.shutdown); + } + + parent.remove(this); + } + } + } + } + + void setFuture(Future f) { + if (future != null || !FUTURE.compareAndSet(this, null, f)) { + if (future != FINISHED) { + f.cancel(false); + } + } + } + + void cancelFuture() { + Future f = future; + if (f != CANCELLED && f != FINISHED) { + f = FUTURE.getAndSet(this, CANCELLED); + if (f != null && f != CANCELLED && f != FINISHED) { + f.cancel(true); + } + } + } + } + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java new file mode 100644 index 000000000..55bfdee05 --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleScheduler.java @@ -0,0 +1,293 @@ +/* + * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.stream.reactive.reactor.core.scheduler; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import reactor.core.Cancellation; +import reactor.core.Exceptions; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.Scheduler; +import reactor.util.concurrent.OpenHashSet; + +/** + * Scheduler that works with a single-threaded ExecutorService and is suited for + * same-thread work (like an event dispatch thread). + */ +final class SingleScheduler implements Scheduler { + + static final AtomicLong COUNTER = new AtomicLong(); + + final ThreadFactory factory; + + volatile ExecutorService executor; + static final AtomicReferenceFieldUpdater EXECUTORS = + AtomicReferenceFieldUpdater.newUpdater(SingleScheduler.class, ExecutorService.class, "executor"); + + static final ExecutorService TERMINATED; + static { + TERMINATED = Executors.newSingleThreadExecutor(); + TERMINATED.shutdownNow(); + } + + public SingleScheduler(ThreadFactory factory) { + this.factory = factory; + init(); + } + + private void init() { + EXECUTORS.lazySet(this, Executors.newSingleThreadExecutor(factory)); + } + + public boolean isStarted() { + return executor != TERMINATED; + } + + @Override + public void start() { + ExecutorService b = null; + for (;;) { + ExecutorService a = executor; + if (a != TERMINATED) { + if (b != null) { + b.shutdownNow(); + } + return; + } + + if (b == null) { + b = Executors.newSingleThreadExecutor(factory); + } + + if (EXECUTORS.compareAndSet(this, a, b)) { + return; + } + } + } + + @Override + public void shutdown() { + ExecutorService a = executor; + if (a != TERMINATED) { + a = EXECUTORS.getAndSet(this, TERMINATED); + if (a != TERMINATED) { + a.shutdownNow(); + } + } + } + + @Override + public Cancellation schedule(Runnable task) { + try { + Future f = executor.submit(task); + return () -> f.cancel(false); + } catch (RejectedExecutionException ex) { + Operators.onErrorDropped(ex); + return REJECTED; + } + } + + @Override + public Worker createWorker() { + return new SingleWorker(executor); + } + + static final class SingleWorker implements Worker { + final ExecutorService exec; + + OpenHashSet tasks; + + volatile boolean shutdown; + + public SingleWorker(ExecutorService exec) { + this.exec = exec; + this.tasks = new OpenHashSet<>(); + } + + @Override + public Cancellation schedule(Runnable task) { + if (shutdown) { + return REJECTED; + } + + SingleWorkerTask pw = new SingleWorkerTask(task, this); + + synchronized (this) { + if (shutdown) { + return REJECTED; + } + tasks.add(pw); + } + + Future f; + try { + f = exec.submit(pw); + } catch (RejectedExecutionException ex) { + Operators.onErrorDropped(ex); + return REJECTED; + } + + if (shutdown) { + f.cancel(true); + return REJECTED; + } + + pw.setFuture(f); + + return pw; + } + + @Override + public void shutdown() { + if (shutdown) { + return; + } + shutdown = true; + OpenHashSet set; + synchronized (this) { + set = tasks; + tasks = null; + } + + if (set != null && !set.isEmpty()) { + Object[] a = set.keys(); + for (Object o : a) { + if (o != null) { + ((SingleWorkerTask)o).cancelFuture(); + } + } + } + } + + void remove(SingleWorkerTask task) { + if (shutdown) { + return; + } + + synchronized (this) { + if (shutdown) { + return; + } + tasks.remove(task); + } + } + + int pendingTasks() { + if (shutdown) { + return 0; + } + + synchronized (this) { + OpenHashSet set = tasks; + if (set != null) { + return set.size(); + } + return 0; + } + } + + static final class SingleWorkerTask implements Runnable, Cancellation { + final Runnable run; + + final SingleWorker parent; + + volatile boolean cancelled; + + volatile Future future; + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater FUTURE = + AtomicReferenceFieldUpdater.newUpdater(SingleWorkerTask.class, Future.class, "future"); + + static final Future FINISHED = CompletableFuture.completedFuture(null); + static final Future CANCELLED = CompletableFuture.completedFuture(null); + + public SingleWorkerTask(Runnable run, SingleWorker parent) { + this.run = run; + this.parent = parent; + } + + @Override + public void run() { + if (cancelled || parent.shutdown) { + return; + } + try { + try { + run.run(); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + Operators.onErrorDropped(ex); + } + } finally { + for (;;) { + Future f = future; + if (f == CANCELLED) { + break; + } + if (FUTURE.compareAndSet(this, f, FINISHED)) { + parent.remove(this); + break; + } + } + } + } + + @Override + public void dispose() { + if (!cancelled) { + cancelled = true; + + Future f = future; + if (f != CANCELLED && f != FINISHED) { + f = FUTURE.getAndSet(this, CANCELLED); + if (f != CANCELLED && f != FINISHED) { + if (f != null) { + f.cancel(false); + } + + parent.remove(this); + } + } + } + } + + void setFuture(Future f) { + if (future != null || !FUTURE.compareAndSet(this, null, f)) { + if (future != FINISHED) { + f.cancel(false); + } + } + } + + void cancelFuture() { + Future f = future; + if (f != CANCELLED && f != FINISHED) { + f = FUTURE.getAndSet(this, CANCELLED); + if (f != null && f != CANCELLED && f != FINISHED) { + f.cancel(false); + } + } + } + } + } +} diff --git a/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java new file mode 100644 index 000000000..1aecc49bf --- /dev/null +++ b/spring-cloud-stream-reactive/src/main/java/org/springframework/cloud/stream/reactive/reactor/core/scheduler/SingleTimedScheduler.java @@ -0,0 +1,464 @@ +/* + * Copyright (c) 2011-2016 Pivotal Software Inc, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.cloud.stream.reactive.reactor.core.scheduler; + +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import reactor.core.Cancellation; +import reactor.core.publisher.Operators; +import reactor.core.scheduler.TimedScheduler; +import reactor.util.concurrent.OpenHashSet; + +/** + * A TimedScheduler with an embedded, single-threaded ScheduledExecutorService, + * shared among all workers. + */ +final class SingleTimedScheduler implements TimedScheduler { + + static final AtomicLong COUNTER = new AtomicLong(); + + final ScheduledThreadPoolExecutor executor; + + /** + * Constructs a new SingleTimedScheduler with the given thread factory. + * @param threadFactory the thread factory to use + */ + SingleTimedScheduler(ThreadFactory threadFactory) { + ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1, threadFactory); + e.setRemoveOnCancelPolicy(true); + executor = e; + } + + @Override + public Cancellation schedule(Runnable task) { + try { + Future f = executor.submit(task); + return () -> f.cancel(false); + } catch (RejectedExecutionException ex) { + return REJECTED; + } + } + + @Override + public Cancellation schedule(Runnable task, long delay, TimeUnit unit) { + try { + Future f = executor.schedule(task, delay, unit); + return () -> f.cancel(false); + } catch (RejectedExecutionException ex) { + return REJECTED; + } + } + + @Override + public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { + try { + Future f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); + return () -> f.cancel(false); + } catch (RejectedExecutionException ex) { + return REJECTED; + } + } + + @Override + public void start() { + throw new UnsupportedOperationException("Not supported, yet."); + } + + @Override + public void shutdown() { + executor.shutdownNow(); + } + + @Override + public TimedWorker createWorker() { + return new SingleTimedSchedulerWorker(executor); + } + + static final class SingleTimedSchedulerWorker implements TimedWorker { + final ScheduledThreadPoolExecutor executor; + + OpenHashSet tasks; + + volatile boolean terminated; + + public SingleTimedSchedulerWorker(ScheduledThreadPoolExecutor executor) { + this.executor = executor; + this.tasks = new OpenHashSet<>(); + } + + @Override + public Cancellation schedule(Runnable task) { + if (terminated) { + return REJECTED; + } + + TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this); + + synchronized (this) { + if (terminated) { + return REJECTED; + } + + tasks.add(sr); + } + + try { + Future f = executor.submit(sr); + sr.set(f); + } catch (RejectedExecutionException ex) { + sr.dispose(); + return REJECTED; + } + + return sr; + } + + void delete(CancelFuture r) { + synchronized (this) { + if (!terminated) { + tasks.remove(r); + } + } + } + + @Override + public Cancellation schedule(Runnable task, long delay, TimeUnit unit) { + if (terminated) { + return REJECTED; + } + + TimedScheduledRunnable sr = new TimedScheduledRunnable(task, this); + + synchronized (this) { + if (terminated) { + return REJECTED; + } + + tasks.add(sr); + } + + try { + Future f = executor.schedule(sr, delay, unit); + sr.set(f); + } catch (RejectedExecutionException ex) { + sr.dispose(); + return REJECTED; + } + + return sr; + } + + @Override + public Cancellation schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) { + if (terminated) { + return REJECTED; + } + + TimedPeriodicScheduledRunnable sr = new TimedPeriodicScheduledRunnable(task, this); + + synchronized (this) { + if (terminated) { + return REJECTED; + } + + tasks.add(sr); + } + + try { + Future f = executor.scheduleAtFixedRate(sr, initialDelay, period, unit); + sr.set(f); + } catch (RejectedExecutionException ex) { + sr.dispose(); + return REJECTED; + } + + return sr; + } + + @Override + public void shutdown() { + if (terminated) { + return; + } + terminated = true; + + OpenHashSet set; + + synchronized (this) { + set = tasks; + if (set == null) { + return; + } + tasks = null; + } + + if (!set.isEmpty()) { + Object[] keys = set.keys(); + for (Object c : keys) { + if (c != null) { + ((CancelFuture)c).cancelFuture(); + } + } + } + } + } + + interface CancelFuture { + void cancelFuture(); + } + + static final class TimedScheduledRunnable + extends AtomicReference> implements Runnable, Cancellation, CancelFuture { + /** */ + private static final long serialVersionUID = 2284024836904862408L; + + final Runnable task; + + final SingleTimedSchedulerWorker parent; + + volatile Thread current; + static final AtomicReferenceFieldUpdater CURRENT = + AtomicReferenceFieldUpdater.newUpdater(TimedScheduledRunnable.class, Thread.class, "current"); + + static final Runnable EMPTY = new Runnable() { + @Override + public void run() { + + } + }; + + static final Future CANCELLED_FUTURE = new FutureTask<>(EMPTY, null); + + static final Future FINISHED = new FutureTask<>(EMPTY, null); + + public TimedScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) { + this.task = task; + this.parent = parent; + } + + @Override + public void run() { + CURRENT.lazySet(this, Thread.currentThread()); + try { + try { + task.run(); + } catch (Throwable e) { + Operators.onErrorDropped(e); + } + } finally { + for (;;) { + Future a = get(); + if (a == CANCELLED_FUTURE) { + break; + } + if (compareAndSet(a, FINISHED)) { + if (a != null) { + doCancel(a); + } + parent.delete(this); + break; + } + } + CURRENT.lazySet(this, null); + } + } + + void doCancel(Future a) { + a.cancel(Thread.currentThread() != current); + } + + @Override + public void cancelFuture() { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (compareAndSet(a, CANCELLED_FUTURE)) { + if (a != null) { + doCancel(a); + } + return; + } + } + } + + @Override + public void dispose() { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (compareAndSet(a, CANCELLED_FUTURE)) { + if (a != null) { + doCancel(a); + } + parent.delete(this); + return; + } + } + } + + + void setFuture(Future f) { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (a == CANCELLED_FUTURE) { + doCancel(a); + return; + } + if (compareAndSet(null, f)) { + return; + } + } + } + + @Override + public String toString() { + return "TimedScheduledRunnable[cancelled=" + (get() == CANCELLED_FUTURE) + + ", task=" + task + + "]"; + } + } + + static final class TimedPeriodicScheduledRunnable + extends AtomicReference> + implements Runnable, Cancellation, CancelFuture { + /** */ + private static final long serialVersionUID = 2284024836904862408L; + + final Runnable task; + + final SingleTimedSchedulerWorker parent; + + volatile Thread current; + static final AtomicReferenceFieldUpdater CURRENT = + AtomicReferenceFieldUpdater.newUpdater(TimedPeriodicScheduledRunnable.class, Thread.class, "current"); + + static final Runnable EMPTY = new Runnable() { + @Override + public void run() { + + } + }; + + static final Future CANCELLED_FUTURE = new FutureTask<>(EMPTY, null); + + static final Future FINISHED = new FutureTask<>(EMPTY, null); + + public TimedPeriodicScheduledRunnable(Runnable task, SingleTimedSchedulerWorker parent) { + this.task = task; + this.parent = parent; + } + + @Override + public void run() { + CURRENT.lazySet(this, Thread.currentThread()); + try { + try { + task.run(); + } catch (Throwable ex) { + Operators.onErrorDropped(ex); + for (;;) { + Future a = get(); + if (a == CANCELLED_FUTURE) { + break; + } + if (compareAndSet(a, FINISHED)) { + parent.delete(this); + break; + } + } + } + } finally { + CURRENT.lazySet(this, null); + } + } + + void doCancel(Future a) { + a.cancel(false); + } + + @Override + public void cancelFuture() { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (compareAndSet(a, CANCELLED_FUTURE)) { + if (a != null) { + doCancel(a); + } + return; + } + } + } + + @Override + public void dispose() { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (compareAndSet(a, CANCELLED_FUTURE)) { + if (a != null) { + doCancel(a); + } + parent.delete(this); + return; + } + } + } + + + void setFuture(Future f) { + for (;;) { + Future a = get(); + if (a == FINISHED) { + return; + } + if (a == CANCELLED_FUTURE) { + doCancel(a); + return; + } + if (compareAndSet(null, f)) { + return; + } + } + } + + @Override + public String toString() { + return "TimedPeriodicScheduledRunnable[cancelled=" + get() + ", task=" + task + "]"; + } + } + +} diff --git a/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerInterruptionTests.java b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerInterruptionTests.java new file mode 100644 index 000000000..1a8aa6a50 --- /dev/null +++ b/spring-cloud-stream-reactive/src/test/java/org/springframework/cloud/stream/reactive/StreamListenerInterruptionTests.java @@ -0,0 +1,78 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.reactive; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import reactor.core.publisher.Flux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.Input; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test validating that a fix for + * is present. + * + * @author Marius Bogoevici + */ +public class StreamListenerInterruptionTests { + + @Test + public void testSubscribersNotInterrupted() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestTimeWindows.class, + "--server.port=0"); + Sink sink = context.getBean(Sink.class); + TestTimeWindows testTimeWindows = context.getBean(TestTimeWindows.class); + sink.input().send(MessageBuilder.withPayload("hello1").build()); + sink.input().send(MessageBuilder.withPayload("hello2").build()); + sink.input().send(MessageBuilder.withPayload("hello3").build()); + assertThat(testTimeWindows.latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(testTimeWindows.interruptionState).isNotNull(); + assertThat(testTimeWindows.interruptionState).isFalse(); + context.close(); + } + + @EnableBinding(Sink.class) + @EnableAutoConfiguration + public static class TestTimeWindows { + + public CountDownLatch latch = new CountDownLatch(1); + + public Boolean interruptionState; + + @StreamListener + public void receive(@Input(Sink.INPUT) Flux input) { + input.window(Duration.ofMillis(500), Duration.ofMillis(100)) + .flatMap(w -> w.reduce("", (x, y) -> x + y)) + .subscribe(x -> { + interruptionState = Thread.currentThread().isInterrupted(); + latch.countDown(); + }); + } + } +} diff --git a/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml b/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml new file mode 100644 index 000000000..54b366fd6 --- /dev/null +++ b/spring-cloud-stream-tools/src/main/resources/checkstyle-suppressions.xml @@ -0,0 +1,7 @@ + + + + + \ No newline at end of file