diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java index d7ac0e8571..5d69b55d65 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java @@ -23,8 +23,6 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -85,16 +83,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac @Nullable private ExecutorService executor; - private final ReentrantLock pauseLock = new ReentrantLock(); - - private final Condition unpaused = this.pauseLock.newCondition(); - - private volatile boolean paused; - - private int executingTaskCount = 0; - @Nullable - private Runnable stopCallback; + private ExecutorLifecycleDelegate lifecycleDelegate; /** @@ -258,6 +248,7 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); + this.lifecycleDelegate = new ExecutorLifecycleDelegate(this.executor); } /** @@ -372,13 +363,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac */ @Override public void start() { - this.pauseLock.lock(); - try { - this.paused = false; - this.unpaused.signalAll(); - } - finally { - this.pauseLock.unlock(); + if (this.lifecycleDelegate != null) { + this.lifecycleDelegate.start(); } } @@ -388,13 +374,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac */ @Override public void stop() { - this.pauseLock.lock(); - try { - this.paused = true; - this.stopCallback = null; - } - finally { - this.pauseLock.unlock(); + if (this.lifecycleDelegate != null) { + this.lifecycleDelegate.stop(); } } @@ -405,19 +386,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac */ @Override public void stop(Runnable callback) { - this.pauseLock.lock(); - try { - this.paused = true; - if (this.executingTaskCount == 0) { - this.stopCallback = null; - callback.run(); - } - else { - this.stopCallback = callback; - } - } - finally { - this.pauseLock.unlock(); + if (this.lifecycleDelegate != null) { + this.lifecycleDelegate.stop(callback); } } @@ -429,7 +399,7 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac */ @Override public boolean isRunning() { - return (this.executor != null && !this.executor.isShutdown() & !this.paused); + return (this.lifecycleDelegate != null && this.lifecycleDelegate.isRunning()); } /** @@ -442,18 +412,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac * @see ThreadPoolExecutor#beforeExecute(Thread, Runnable) */ protected void beforeExecute(Thread thread, Runnable task) { - this.pauseLock.lock(); - try { - while (this.paused && this.executor != null && !this.executor.isShutdown()) { - this.unpaused.await(); - } - } - catch (InterruptedException ex) { - thread.interrupt(); - } - finally { - this.executingTaskCount++; - this.pauseLock.unlock(); + if (this.lifecycleDelegate != null) { + this.lifecycleDelegate.beforeExecute(thread); } } @@ -467,19 +427,8 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac * @see ThreadPoolExecutor#afterExecute(Runnable, Throwable) */ protected void afterExecute(Runnable task, @Nullable Throwable ex) { - this.pauseLock.lock(); - try { - this.executingTaskCount--; - if (this.executingTaskCount == 0) { - Runnable callback = this.stopCallback; - if (callback != null) { - callback.run(); - this.stopCallback = null; - } - } - } - finally { - this.pauseLock.unlock(); + if (this.lifecycleDelegate != null) { + this.lifecycleDelegate.afterExecute(); } } diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorLifecycleDelegate.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorLifecycleDelegate.java new file mode 100644 index 0000000000..3f449f12bd --- /dev/null +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ExecutorLifecycleDelegate.java @@ -0,0 +1,136 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import org.springframework.context.SmartLifecycle; +import org.springframework.lang.Nullable; + +/** + * An internal delegate for common {@link ExecutorService} lifecycle management + * with pause/resume support. + * + * @author Juergen Hoeller + * @since 6.1 + * @see ExecutorConfigurationSupport + * @see SimpleAsyncTaskScheduler + */ +final class ExecutorLifecycleDelegate implements SmartLifecycle { + + private final ExecutorService executor; + + private final ReentrantLock pauseLock = new ReentrantLock(); + + private final Condition unpaused = this.pauseLock.newCondition(); + + private volatile boolean paused; + + private int executingTaskCount = 0; + + @Nullable + private Runnable stopCallback; + + + public ExecutorLifecycleDelegate(ExecutorService executor) { + this.executor = executor; + } + + + @Override + public void start() { + this.pauseLock.lock(); + try { + this.paused = false; + this.unpaused.signalAll(); + } + finally { + this.pauseLock.unlock(); + } + } + + @Override + public void stop() { + this.pauseLock.lock(); + try { + this.paused = true; + this.stopCallback = null; + } + finally { + this.pauseLock.unlock(); + } + } + + @Override + public void stop(Runnable callback) { + this.pauseLock.lock(); + try { + this.paused = true; + if (this.executingTaskCount == 0) { + this.stopCallback = null; + callback.run(); + } + else { + this.stopCallback = callback; + } + } + finally { + this.pauseLock.unlock(); + } + } + + @Override + public boolean isRunning() { + return (!this.executor.isShutdown() & !this.paused); + } + + void beforeExecute(Thread thread) { + this.pauseLock.lock(); + try { + while (this.paused && !this.executor.isShutdown()) { + this.unpaused.await(); + } + } + catch (InterruptedException ex) { + thread.interrupt(); + } + finally { + this.executingTaskCount++; + this.pauseLock.unlock(); + } + } + + void afterExecute() { + this.pauseLock.lock(); + try { + this.executingTaskCount--; + if (this.executingTaskCount == 0) { + Runnable callback = this.stopCallback; + if (callback != null) { + callback.run(); + this.stopCallback = null; + } + } + } + finally { + this.pauseLock.unlock(); + } + } + +} diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java new file mode 100644 index 0000000000..8a8b7f5310 --- /dev/null +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java @@ -0,0 +1,280 @@ +/* + * Copyright 2002-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.scheduling.concurrent; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.context.ApplicationListener; +import org.springframework.context.SmartLifecycle; +import org.springframework.context.event.ContextClosedEvent; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskRejectedException; +import org.springframework.lang.Nullable; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.Trigger; +import org.springframework.scheduling.support.TaskUtils; +import org.springframework.util.ErrorHandler; + +/** + * A simple implementation of Spring's {@link TaskScheduler} interface, using + * a single scheduler thread and executing every scheduled task in an individual + * separate thread. This is an attractive choice with virtual threads on JDK 21, + * so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}. + * + *

Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable + * replacement for it, e.g. as a single shared instance serving as a + * {@link org.springframework.core.task.TaskExecutor} as well as a {@link TaskScheduler}. + * This is generally not the case with other executor/scheduler implementations + * which tend to have specific constraints for the scheduler thread pool, + * requiring a separate thread pool for general executor purposes in practice. + * + *

As an alternative to the built-in thread-per-task capability, this scheduler + * can also be configured with a separate target executor for scheduled task + * execution through {@link #setTargetTaskExecutor}: e.g. pointing to a shared + * {@link ThreadPoolTaskExecutor} bean. This is still rather different from a + * {@link ThreadPoolTaskScheduler} setup since it always uses a single scheduler + * thread while dynamically dispatching to the target thread pool which may have + * a dynamic core/max pool size range, participating in a shared concurrency limit. + * + * @author Juergen Hoeller + * @since 6.1 + * @see #setVirtualThreads + * @see #setTargetTaskExecutor + * @see SimpleAsyncTaskExecutor + * @see ThreadPoolTaskScheduler + */ +@SuppressWarnings("serial") +public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler, + ApplicationContextAware, SmartLifecycle, ApplicationListener, AutoCloseable { + + private static final TimeUnit NANO = TimeUnit.NANOSECONDS; + + + private final ScheduledExecutorService scheduledExecutor = createScheduledExecutor(); + + private final ExecutorLifecycleDelegate lifecycleDelegate = new ExecutorLifecycleDelegate(this.scheduledExecutor); + + private Clock clock = Clock.systemDefaultZone(); + + private int phase = DEFAULT_PHASE; + + @Nullable + private Executor targetTaskExecutor; + + @Nullable + private ApplicationContext applicationContext; + + + /** + * Set the clock to use for scheduling purposes. + *

The default clock is the system clock for the default time zone. + * @since 5.3 + * @see Clock#systemDefaultZone() + */ + public void setClock(Clock clock) { + this.clock = clock; + } + + @Override + public Clock getClock() { + return this.clock; + } + + /** + * Specify the lifecycle phase for pausing and resuming this executor. + * The default is {@link #DEFAULT_PHASE}. + * @see SmartLifecycle#getPhase() + */ + public void setPhase(int phase) { + this.phase = phase; + } + + /** + * Return the lifecycle phase for pausing and resuming this executor. + * @see #setPhase + */ + @Override + public int getPhase() { + return this.phase; + } + + /** + * Specify a custom target {@link Executor} to delegate to for + * the individual execution of scheduled tasks. This can for example + * be set to a separate thread pool for executing scheduled tasks, + * whereas this scheduler keeps using its single scheduler thread. + *

If not set, the regular {@link SimpleAsyncTaskExecutor} + * arrangements kicks in with a new thread per task. + */ + public void setTargetTaskExecutor(Executor targetTaskExecutor) { + this.targetTaskExecutor = (targetTaskExecutor == this ? null : targetTaskExecutor); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } + + + private ScheduledExecutorService createScheduledExecutor() { + return new ScheduledThreadPoolExecutor(1, this::newThread) { + @Override + protected void beforeExecute(Thread thread, Runnable task) { + lifecycleDelegate.beforeExecute(thread); + } + @Override + protected void afterExecute(Runnable task, Throwable ex) { + lifecycleDelegate.afterExecute(); + } + }; + } + + @Override + protected void doExecute(Runnable task) { + if (this.targetTaskExecutor != null) { + this.targetTaskExecutor.execute(task); + } + else { + super.doExecute(task); + } + } + + private Runnable scheduledTask(Runnable task) { + return () -> execute(task); + } + + + @Override + @Nullable + public ScheduledFuture schedule(Runnable task, Trigger trigger) { + try { + Runnable delegate = scheduledTask(task); + ErrorHandler errorHandler = TaskUtils.getDefaultErrorHandler(true); + return new ReschedulingRunnable( + delegate, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule(); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + @Override + public ScheduledFuture schedule(Runnable task, Instant startTime) { + Duration delay = Duration.between(this.clock.instant(), startTime); + try { + return this.scheduledExecutor.schedule(scheduledTask(task), NANO.convert(delay), NANO); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) { + Duration initialDelay = Duration.between(this.clock.instant(), startTime); + try { + return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task), + NANO.convert(initialDelay), NANO.convert(period), NANO); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + @Override + public ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period) { + try { + return this.scheduledExecutor.scheduleAtFixedRate(scheduledTask(task), + 0, NANO.convert(period), NANO); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Instant startTime, Duration delay) { + Duration initialDelay = Duration.between(this.clock.instant(), startTime); + try { + return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task), + NANO.convert(initialDelay), NANO.convert(delay), NANO); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable task, Duration delay) { + try { + return this.scheduledExecutor.scheduleWithFixedDelay(scheduledTask(task), + 0, NANO.convert(delay), NANO); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException(this.scheduledExecutor, task, ex); + } + } + + + @Override + public void start() { + this.lifecycleDelegate.start(); + } + + @Override + public void stop() { + this.lifecycleDelegate.stop(); + } + + @Override + public void stop(Runnable callback) { + this.lifecycleDelegate.stop(callback); + } + + @Override + public boolean isRunning() { + return this.lifecycleDelegate.isRunning(); + } + + @Override + public void onApplicationEvent(ContextClosedEvent event) { + if (event.getApplicationContext() == this.applicationContext) { + this.scheduledExecutor.shutdown(); + } + } + + @Override + public void close() { + for (Runnable remainingTask : this.scheduledExecutor.shutdownNow()) { + if (remainingTask instanceof Future future) { + future.cancel(true); + } + } + } + +} diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index fd1a8aba4c..9ab42bae99 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -46,8 +46,9 @@ import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureTask; /** - * Implementation of Spring's {@link TaskScheduler} interface, wrapping - * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor}. + * A standard implementation of Spring's {@link TaskScheduler} interface, wrapping + * a native {@link java.util.concurrent.ScheduledThreadPoolExecutor} and providing + * all applicable configuration options for it. * * @author Juergen Hoeller * @author Mark Fisher diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java index 82f0118222..73e7e8ec87 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -33,6 +35,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import org.springframework.core.testfixture.EnabledForTestGroups; import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.IntervalTask; import org.springframework.scheduling.config.ScheduledTaskHolder; @@ -47,6 +50,7 @@ import static org.springframework.core.testfixture.TestGroup.LONG_RUNNING; * * @author Chris Beams * @author Sam Brannen + * @author Juergen Hoeller * @since 3.1 */ public class EnableSchedulingTests { @@ -62,39 +66,31 @@ public class EnableSchedulingTests { } - @Test + @ParameterizedTest + @ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class}) @EnabledForTestGroups(LONG_RUNNING) - public void withFixedRateTask() throws InterruptedException { - ctx = new AnnotationConfigApplicationContext(FixedRateTaskConfig.class); + public void withFixedRateTask(Class configClass) throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(configClass); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(2); Thread.sleep(110); assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); } - @Test + @ParameterizedTest + @ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class}) @EnabledForTestGroups(LONG_RUNNING) - public void withSubclass() throws InterruptedException { - ctx = new AnnotationConfigApplicationContext(FixedRateTaskConfigSubclass.class); - assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(2); - - Thread.sleep(110); - assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); - } - - @Test - @EnabledForTestGroups(LONG_RUNNING) - public void withExplicitScheduler() throws InterruptedException { - ctx = new AnnotationConfigApplicationContext(ExplicitSchedulerConfig.class); + public void withExplicitScheduler(Class configClass) throws InterruptedException { + ctx = new AnnotationConfigApplicationContext(configClass); assertThat(ctx.getBean(ScheduledTaskHolder.class).getScheduledTasks()).hasSize(1); Thread.sleep(110); ctx.stop(); int count1 = ctx.getBean(AtomicInteger.class).get(); - assertThat(count1).isGreaterThanOrEqualTo(10); + assertThat(count1).isGreaterThanOrEqualTo(10).isLessThan(20); Thread.sleep(110); int count2 = ctx.getBean(AtomicInteger.class).get(); - assertThat(count2).isEqualTo(count1); + assertThat(count2).isGreaterThanOrEqualTo(10).isLessThan(20); ctx.start(); Thread.sleep(110); int count3 = ctx.getBean(AtomicInteger.class).get(); @@ -241,7 +237,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler myTaskScheduler() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler-"); return scheduler; } @@ -259,13 +255,26 @@ public class EnableSchedulingTests { } + @Configuration + static class ExplicitSchedulerConfigSubclass extends ExplicitSchedulerConfig { + + @Bean + @Override + public TaskScheduler myTaskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setThreadNamePrefix("explicitScheduler-"); + return scheduler; + } + } + + @Configuration @EnableScheduling static class AmbiguousExplicitSchedulerConfig { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -291,7 +300,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -329,7 +338,7 @@ public class EnableSchedulingTests { @Bean @Qualifier("myScheduler") public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -362,7 +371,7 @@ public class EnableSchedulingTests { @Bean @Qualifier("myScheduler") public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -402,7 +411,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -426,7 +435,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); return scheduler; } @@ -467,7 +476,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); return scheduler; } @@ -497,7 +506,7 @@ public class EnableSchedulingTests { @Bean public TaskScheduler taskScheduler1() { - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); return scheduler; } diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index 008de2f646..c4fc4374f9 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -31,17 +31,19 @@ import org.springframework.util.concurrent.ListenableFutureTask; /** * {@link TaskExecutor} implementation that fires up a new Thread for each task, - * executing it asynchronously. + * executing it asynchronously. Supports a virtual thread option on JDK 21. * *

Supports limiting concurrent threads through the "concurrencyLimit" * bean property. By default, the number of concurrent threads is unlimited. * *

NOTE: This implementation does not reuse threads! Consider a * thread-pooling TaskExecutor implementation instead, in particular for - * executing a large number of short-lived tasks. + * executing a large number of short-lived tasks. Alternatively, on JDK 21, + * consider setting {@link #setVirtualThreads} to {@code true}. * * @author Juergen Hoeller * @since 2.0 + * @see #setVirtualThreads * @see #setConcurrencyLimit * @see SyncTaskExecutor * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor @@ -251,18 +253,28 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator * Template method for the actual execution of a task. *

The default implementation creates a new Thread and starts it. * @param task the Runnable to execute + * @see #newThread + * @see Thread#start() + */ + protected void doExecute(Runnable task) { + newThread(task).start(); + } + + /** + * Create a new Thread for the given task. + * @param task the Runnable to create a Thread for + * @return the new Thread instance + * @since 6.1 * @see #setVirtualThreads * @see #setThreadFactory * @see #createThread - * @see java.lang.Thread#start() */ - protected void doExecute(Runnable task) { + protected Thread newThread(Runnable task) { if (this.virtualThreadDelegate != null) { - this.virtualThreadDelegate.startVirtualThread(nextThreadName(), task); + return this.virtualThreadDelegate.newVirtualThread(nextThreadName(), task); } else { - Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); - thread.start(); + return (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); } } diff --git a/spring-core/src/main/java/org/springframework/core/task/VirtualThreadDelegate.java b/spring-core/src/main/java/org/springframework/core/task/VirtualThreadDelegate.java index ade9a548c2..6f32b88e61 100644 --- a/spring-core/src/main/java/org/springframework/core/task/VirtualThreadDelegate.java +++ b/spring-core/src/main/java/org/springframework/core/task/VirtualThreadDelegate.java @@ -26,7 +26,7 @@ import java.util.concurrent.ThreadFactory; * @since 6.1 * @see VirtualThreadTaskExecutor */ -class VirtualThreadDelegate { +final class VirtualThreadDelegate { public VirtualThreadDelegate() { throw new UnsupportedOperationException("Virtual threads not supported on JDK <21"); @@ -40,7 +40,7 @@ class VirtualThreadDelegate { throw new UnsupportedOperationException(); } - public Thread startVirtualThread(String name, Runnable task) { + public Thread newVirtualThread(String name, Runnable task) { throw new UnsupportedOperationException(); } diff --git a/spring-core/src/main/java/org/springframework/core/task/VirtualThreadTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/VirtualThreadTaskExecutor.java index ae89c2d75f..b246428f1a 100644 --- a/spring-core/src/main/java/org/springframework/core/task/VirtualThreadTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/VirtualThreadTaskExecutor.java @@ -27,7 +27,7 @@ import java.util.concurrent.ThreadFactory; * * @author Juergen Hoeller * @since 6.1 - * @see SimpleAsyncTaskExecutor + * @see SimpleAsyncTaskExecutor#setVirtualThreads */ public class VirtualThreadTaskExecutor implements AsyncTaskExecutor { diff --git a/spring-core/src/main/java21/org/springframework/core/task/VirtualThreadDelegate.java b/spring-core/src/main/java21/org/springframework/core/task/VirtualThreadDelegate.java index 5971094b35..db94db9fa2 100644 --- a/spring-core/src/main/java21/org/springframework/core/task/VirtualThreadDelegate.java +++ b/spring-core/src/main/java21/org/springframework/core/task/VirtualThreadDelegate.java @@ -26,7 +26,7 @@ import java.util.concurrent.ThreadFactory; * @since 6.1 * @see VirtualThreadTaskExecutor */ -class VirtualThreadDelegate { +final class VirtualThreadDelegate { private final Thread.Builder threadBuilder = Thread.ofVirtual(); @@ -38,8 +38,8 @@ class VirtualThreadDelegate { return this.threadBuilder.name(threadNamePrefix, 0).factory(); } - public Thread startVirtualThread(String name, Runnable task) { - return this.threadBuilder.name(name).start(task); + public Thread newVirtualThread(String name, Runnable task) { + return this.threadBuilder.name(name).unstarted(task); } }