Introduce SimpleAsyncTaskScheduler (extending SimpleAsyncTaskExecutor)

Closes gh-30956
This commit is contained in:
Juergen Hoeller
2023-07-26 23:56:59 +02:00
parent 5e4ed68fdd
commit 7681200ee7
9 changed files with 493 additions and 106 deletions

View File

@@ -23,8 +23,6 @@ import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,16 +83,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
@Nullable
private ExecutorService executor;
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = this.pauseLock.newCondition();
private volatile boolean paused;
private int executingTaskCount = 0;
@Nullable
private Runnable stopCallback;
private ExecutorLifecycleDelegate lifecycleDelegate;
/**
@@ -258,6 +248,7 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
this.lifecycleDelegate = new ExecutorLifecycleDelegate(this.executor);
}
/**
@@ -372,13 +363,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
*/
@Override
public void start() {
this.pauseLock.lock();
try {
this.paused = false;
this.unpaused.signalAll();
}
finally {
this.pauseLock.unlock();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.start();
}
}
@@ -388,13 +374,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
*/
@Override
public void stop() {
this.pauseLock.lock();
try {
this.paused = true;
this.stopCallback = null;
}
finally {
this.pauseLock.unlock();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.stop();
}
}
@@ -405,19 +386,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
*/
@Override
public void stop(Runnable callback) {
this.pauseLock.lock();
try {
this.paused = true;
if (this.executingTaskCount == 0) {
this.stopCallback = null;
callback.run();
}
else {
this.stopCallback = callback;
}
}
finally {
this.pauseLock.unlock();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.stop(callback);
}
}
@@ -429,7 +399,7 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
*/
@Override
public boolean isRunning() {
return (this.executor != null && !this.executor.isShutdown() & !this.paused);
return (this.lifecycleDelegate != null && this.lifecycleDelegate.isRunning());
}
/**
@@ -442,18 +412,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
* @see ThreadPoolExecutor#beforeExecute(Thread, Runnable)
*/
protected void beforeExecute(Thread thread, Runnable task) {
this.pauseLock.lock();
try {
while (this.paused && this.executor != null && !this.executor.isShutdown()) {
this.unpaused.await();
}
}
catch (InterruptedException ex) {
thread.interrupt();
}
finally {
this.executingTaskCount++;
this.pauseLock.unlock();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.beforeExecute(thread);
}
}
@@ -467,19 +427,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
* @see ThreadPoolExecutor#afterExecute(Runnable, Throwable)
*/
protected void afterExecute(Runnable task, @Nullable Throwable ex) {
this.pauseLock.lock();
try {
this.executingTaskCount--;
if (this.executingTaskCount == 0) {
Runnable callback = this.stopCallback;
if (callback != null) {
callback.run();
this.stopCallback = null;
}
}
}
finally {
this.pauseLock.unlock();
if (this.lifecycleDelegate != null) {
this.lifecycleDelegate.afterExecute();
}
}

View File

@@ -0,0 +1,136 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.context.SmartLifecycle;
import org.springframework.lang.Nullable;
/**
* An internal delegate for common {@link ExecutorService} lifecycle management
* with pause/resume support.
*
* @author Juergen Hoeller
* @since 6.1
* @see ExecutorConfigurationSupport
* @see SimpleAsyncTaskScheduler
*/
final class ExecutorLifecycleDelegate implements SmartLifecycle {
private final ExecutorService executor;
private final ReentrantLock pauseLock = new ReentrantLock();
private final Condition unpaused = this.pauseLock.newCondition();
private volatile boolean paused;
private int executingTaskCount = 0;
@Nullable
private Runnable stopCallback;
public ExecutorLifecycleDelegate(ExecutorService executor) {
this.executor = executor;
}
@Override
public void start() {
this.pauseLock.lock();
try {
this.paused = false;
this.unpaused.signalAll();
}
finally {
this.pauseLock.unlock();
}
}
@Override
public void stop() {
this.pauseLock.lock();
try {
this.paused = true;
this.stopCallback = null;
}
finally {
this.pauseLock.unlock();
}
}
@Override
public void stop(Runnable callback) {
this.pauseLock.lock();
try {
this.paused = true;
if (this.executingTaskCount == 0) {
this.stopCallback = null;
callback.run();
}
else {
this.stopCallback = callback;
}
}
finally {
this.pauseLock.unlock();
}
}
@Override
public boolean isRunning() {
return (!this.executor.isShutdown() & !this.paused);
}
void beforeExecute(Thread thread) {
this.pauseLock.lock();
try {
while (this.paused && !this.executor.isShutdown()) {
this.unpaused.await();
}
}
catch (InterruptedException ex) {
thread.interrupt();
}
finally {
this.executingTaskCount++;
this.pauseLock.unlock();
}
}
void afterExecute() {
this.pauseLock.lock();
try {
this.executingTaskCount--;
if (this.executingTaskCount == 0) {
Runnable callback = this.stopCallback;
if (callback != null) {
callback.run();
this.stopCallback = null;
}
}
}
finally {
this.pauseLock.unlock();
}
}
}

View File

@@ -0,0 +1,280 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.ErrorHandler;
/**
* A simple implementation of Spring's {@link TaskScheduler} interface, using
* a single scheduler thread and executing every scheduled task in an individual
* separate thread. This is an attractive choice with virtual threads on JDK 21,
* so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}.
*
* <p>Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable
* replacement for it, e.g. as a single shared instance serving as a
* {@link org.springframework.core.task.TaskExecutor} as well as a {@link TaskScheduler}.
* This is generally not the case with other executor/scheduler implementations
* which tend to have specific constraints for the scheduler thread pool,
* requiring a separate thread pool for general executor purposes in practice.
*
* <p>As an alternative to the built-in thread-per-task capability, this scheduler
* can also be configured with a separate target executor for scheduled task
* execution through {@link #setTargetTaskExecutor}: e.g. pointing to a shared
* {@link ThreadPoolTaskExecutor} bean. This is still rather different from a
* {@link ThreadPoolTaskScheduler} setup since it always uses a single scheduler
* thread while dynamically dispatching to the target thread pool which may have
* a dynamic core/max pool size range, participating in a shared concurrency limit.
*
* @author Juergen Hoeller
* @since 6.1
* @see #setVirtualThreads
* @see #setTargetTaskExecutor
* @see SimpleAsyncTaskExecutor
* @see ThreadPoolTaskScheduler
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler,
ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent>, AutoCloseable {
private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
private final ScheduledExecutorService scheduledExecutor = createScheduledExecutor();
private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor);
private Clock clock = Clock.systemDefaultZone();
private int phase = DEFAULT_PHASE;
@Nullable
private Executor targetTaskExecutor;
@Nullable
private ApplicationContext applicationContext;
/**
* Set the clock to use for scheduling purposes.
* <p>The default clock is the system clock for the default time zone.
* @since 5.3
* @see Clock#systemDefaultZone()
*/
public void setClock(Clock clock) {
this.clock = clock;
}
@Override
public Clock getClock() {
return this.clock;
}
/**
* Specify the lifecycle phase for pausing and resuming this executor.
* The default is {@link #DEFAULT_PHASE}.
* @see SmartLifecycle#getPhase()
*/
public void setPhase(int phase) {
this.phase = phase;
}
/**
* Return the lifecycle phase for pausing and resuming this executor.
* @see #setPhase
*/
@Override
public int getPhase() {
return this.phase;
}
/**
* Specify a custom target {@link Executor} to delegate to for
* the individual execution of scheduled tasks. This can for example
* be set to a separate thread pool for executing scheduled tasks,
* whereas this scheduler keeps using its single scheduler thread.
* <p>If not set, the regular {@link SimpleAsyncTaskExecutor}
* arrangements kicks in with a new thread per task.
*/
public void setTargetTaskExecutor(Executor targetTaskExecutor) {
this.targetTaskExecutor = (targetTaskExecutor == this ? null : targetTaskExecutor);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
private ScheduledExecutorService createScheduledExecutor() {
return new ScheduledThreadPoolExecutor(1, this::newThread) {
@Override
protected void beforeExecute(Thread thread, Runnable task) {
lifecycleDelegate.beforeExecute(thread);
}
@Override
protected void afterExecute(Runnable task, Throwable ex) {
lifecycleDelegate.afterExecute();
}
};
}
@Override
protected void doExecute(Runnable task) {
if (this.targetTaskExecutor != null) {
this.targetTaskExecutor.execute(task);
}
else {
super.doExecute(task);
}
}
private Runnable scheduledTask(Runnable task) {
return () -> execute(task);
}
@Override
@Nullable
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
try {
Runnable delegate = scheduledTask(task);
ErrorHandler errorHandler = TaskUtils.getDefaultErrorHandler(true);
return new ReschedulingRunnable(
delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
Duration delay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
NANO.convert(initialDelay), NANO.convert(period), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Duration period) {
try {
return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task),
0, NANO.convert(period), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) {
Duration initialDelay = Duration.between(this.clock.instant(), startTime);
try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task),
NANO.convert(initialDelay), NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Duration delay) {
try {
return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task),
0, NANO.convert(delay), NANO);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(this.scheduledExecutor, task, ex);
}
}
@Override
public void start() {
this.lifecycleDelegate.start();
}
@Override
public void stop() {
this.lifecycleDelegate.stop();
}
@Override
public void stop(Runnable callback) {
this.lifecycleDelegate.stop(callback);
}
@Override
public boolean isRunning() {
return this.lifecycleDelegate.isRunning();
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
if (event.getApplicationContext() == this.applicationContext) {
this.scheduledExecutor.shutdown();
}
}
@Override
public void close() {
for (Runnable remainingTask : this.scheduledExecutor.shutdownNow()) {
if (remainingTask instanceof Future<?> future) {
future.cancel(true);
}
}
}
}

View File

@@ -46,8 +46,9 @@ import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
/**
* Implementation of Spring's {@link TaskScheduler} interface, wrapping
* a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}.
* A standard implementation of Spring's {@link TaskScheduler} interface, wrapping
* a native {@link java.util.concurrent.ScheduledThreadPoolExecutor} and providing
* all applicable configuration options for it.
*
* @author Juergen Hoeller
* @author Mark Fisher