Support @Scheduled fixedDelay/fixedRate on Publisher-returning methods

This commit adds support for `@Scheduled` annotation on reactive
methods and Kotlin suspending functions.

Reactive methods are methods that return a `Publisher` or a subclass
of `Publisher`. The `ReactiveAdapterRegistry` is used to support many
implementations, such as `Flux`, `Mono`, `Flow`, `Single`, etc.
Methods should not take any argument and published values will be
ignored, as they are already with synchronous support.

This is implemented in `ScheduledAnnotationReactiveSupport`, which
"converts" Publishers to `Runnable`. This strategy keeps track of
active Subscriptions in the `ScheduledAnnotationBeanPostProcessor`,
in order to cancel them all in case of shutdown.
The existing scheduling support for tasks is reused, aligning the
triggering behavior with the existing support: cron, fixedDelay and
fixedRate are all supported strategies.

If the `Publisher` errors, the exception is logged at warn level and
otherwise ignored. As a result new `Runnable` instances will be
created for each execution and scheduling will continue.
The only difference with synchronous support is that error signals
will not be thrown by those `Runnable` tasks and will not be made
available to the `org.springframework.util.ErrorHandler` contract.
This is due to the asynchronous and lazy nature of Publishers.

Closes gh-23533
Closes gh-28515
This commit is contained in:
Simon Baslé
2023-06-05 12:25:15 +02:00
committed by Brian Clozel
parent 53f891226e
commit 35052f2113
7 changed files with 877 additions and 4 deletions

View File

@@ -36,6 +36,20 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar;
* a {@code void} return type; if not, the returned value will be ignored
* when called through the scheduler.
*
* <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
* The {@code Publisher} MUST support multiple subsequent subscriptions (i.e. be cold).
* The returned Publisher is only produced once, and the scheduling infrastructure
* then periodically {@code subscribe()} to it according to configuration.
* Values emitted by the publisher are ignored. Errors are logged at WARN level, which
* doesn't prevent further iterations. If a {@code fixed delay} is configured, the
* subscription is blocked upon in order to respect the fixed delay semantics.
*
* <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
* bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
* used to adapt the suspending function into a {@code Publisher} which is treated
* the same way as in the reactive method case (see above).
*
* <p>Processing of {@code @Scheduled} annotations is performed by
* registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
* done manually or, more conveniently, through the {@code <task:annotation-driven/>}

View File

@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -98,6 +99,7 @@ import org.springframework.util.StringValueResolver;
* @author Elizabeth Chatman
* @author Victor Brown
* @author Sam Brannen
* @author Simon Baslé
* @since 3.0
* @see Scheduled
* @see EnableScheduling
@@ -143,6 +145,8 @@ public class ScheduledAnnotationBeanPostProcessor
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap<>(16);
private final Map<Object, List<Runnable>> reactiveSubscriptions = new IdentityHashMap<>(16);
/**
* Create a default {@code ScheduledAnnotationBeanPostProcessor}.
@@ -385,15 +389,33 @@ public class ScheduledAnnotationBeanPostProcessor
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean.
* Process the given {@code @Scheduled} method declaration on the given bean,
* attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive}
* methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
* @see #processScheduledSync(Scheduled, Method, Object)
* @see #processScheduledAsync(Scheduled, Method, Object)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
// Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledAsync(scheduled, method, bean);
return;
}
processScheduledSync(scheduled, method, bean);
}
/**
* Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable}
* accordingly. The Runnable can represent either a synchronous method invocation
* (see {@link #processScheduledSync(Scheduled, Method, Object)}) or an asynchronous
* one (see {@link #processScheduledAsync(Scheduled, Method, Object)}).
*/
protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
@@ -516,6 +538,53 @@ public class ScheduledAnnotationBeanPostProcessor
}
}
/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method MUST take no arguments. Its return value
* is ignored (if any) and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
* @see #createRunnable(Object, Method)
*/
protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = createRunnable(bean, method);
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly
* scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* MUST either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) {
Runnable task;
try {
task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled,
this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>()));
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
processScheduledTask(scheduled, task, method, bean);
}
/**
* Create a {@link Runnable} for the given bean instance,
* calling the specified scheduled method.
@@ -554,6 +623,8 @@ public class ScheduledAnnotationBeanPostProcessor
/**
* Return all currently scheduled tasks, from {@link Scheduled} methods
* as well as from programmatic {@link SchedulingConfigurer} interaction.
* <p>Note this includes upcoming scheduled subscriptions for reactive methods,
* but doesn't cover any currently active subscription for such methods.
* @since 5.0.2
*/
@Override
@@ -572,20 +643,27 @@ public class ScheduledAnnotationBeanPostProcessor
@Override
public void postProcessBeforeDestruction(Object bean, String beanName) {
Set<ScheduledTask> tasks;
List<Runnable> liveSubscriptions;
synchronized (this.scheduledTasks) {
tasks = this.scheduledTasks.remove(bean);
liveSubscriptions = this.reactiveSubscriptions.remove(bean);
}
if (tasks != null) {
for (ScheduledTask task : tasks) {
task.cancel();
}
}
if (liveSubscriptions != null) {
for (Runnable subscription : liveSubscriptions) {
subscription.run(); // equivalent to cancelling the subscription
}
}
}
@Override
public boolean requiresDestruction(Object bean) {
synchronized (this.scheduledTasks) {
return this.scheduledTasks.containsKey(bean);
return this.scheduledTasks.containsKey(bean) || this.reactiveSubscriptions.containsKey(bean);
}
}
@@ -599,6 +677,12 @@ public class ScheduledAnnotationBeanPostProcessor
}
}
this.scheduledTasks.clear();
Collection<List<Runnable>> allLiveSubscriptions = this.reactiveSubscriptions.values();
for (List<Runnable> liveSubscriptions : allLiveSubscriptions) {
for (Runnable liveSubscription : liveSubscriptions) {
liveSubscription.run(); //equivalent to cancelling the subscription
}
}
}
this.registrar.destroy();
}

View File

@@ -0,0 +1,268 @@
/*
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import org.springframework.aop.support.AopUtils;
import org.springframework.core.CoroutinesUtils;
import org.springframework.core.KotlinDetector;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive cases
* without a dependency on optional classes.
* @author Simon Baslé
* @since 6.1.0
*/
abstract class ScheduledAnnotationReactiveSupport {
static final boolean reactorPresent = ClassUtils.isPresent(
"reactor.core.publisher.Flux", ScheduledAnnotationReactiveSupport.class.getClassLoader());
static final boolean coroutinesReactorPresent = ClassUtils.isPresent(
"kotlinx.coroutines.reactor.MonoKt", ScheduledAnnotationReactiveSupport.class.getClassLoader());
private static final Log LOGGER = LogFactory.getLog(ScheduledAnnotationReactiveSupport.class);
/**
* Checks that if the method is reactive, it can be scheduled. Methods are considered
* eligible for reactive scheduling if they either return an instance of a type that
* can be converted to {@code Publisher} or are a Kotlin Suspending Function.
* If the method isn't matching these criteria then this check returns {@code false}.
* <p>For scheduling of Kotlin Suspending Functions, the Coroutine-Reactor bridge
* {@code kotlinx.coroutines.reactor} MUST be present at runtime (in order to invoke
* suspending functions as a {@code Publisher}).
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
* an {@code IllegalStateException}.
* @throws IllegalStateException if the method is reactive but Reactor and/or the
* Kotlin coroutines bridge are not present at runtime
*/
static boolean isReactive(Method method) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
//Note that suspending functions declared without args have a single Continuation parameter in reflective inspection
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be"
+ " annotated with @Scheduled if declared without arguments");
Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with"
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
return true;
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
if (!registry.hasAdapters()) {
return false;
}
Class<?> returnType = method.getReturnType();
ReactiveAdapter candidateAdapter = registry.getAdapter(returnType);
if (candidateAdapter == null) {
return false;
}
Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with"
+ " @Scheduled if declared without arguments");
Assert.isTrue(candidateAdapter.getDescriptor().isDeferred(), "Reactive methods may only be annotated with"
+ " @Scheduled if the return type supports deferred execution");
return true;
}
/**
* Turn the invocation of the provided {@code Method} into a {@code Publisher},
* either by reflectively invoking it and converting the result to a {@code Publisher}
* via {@link ReactiveAdapterRegistry} or by converting a Kotlin suspending function
* into a {@code Publisher} via {@link CoroutinesUtils}.
* The {@link #isReactive(Method)} check is a precondition to calling this method.
* If Reactor is present at runtime, the Publisher is additionally converted to a {@code Flux}
* with a checkpoint String, allowing for better debugging.
*/
static Publisher<?> getPublisherFor(Method method, Object bean) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
return CoroutinesUtils.invokeSuspendingFunction(method, bean, (Object[]) method.getParameters());
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
Class<?> returnType = method.getReturnType();
ReactiveAdapter adapter = registry.getAdapter(returnType);
if (adapter == null) {
throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher");
}
if (!adapter.getDescriptor().isDeferred()) {
throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher, "
+ returnType.getSimpleName() + " is not a deferred reactive type");
}
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
try {
ReflectionUtils.makeAccessible(invocableMethod);
Object r = invocableMethod.invoke(bean);
Publisher<?> publisher = adapter.toPublisher(r);
//if Reactor is on the classpath, we could benefit from having a checkpoint for debuggability
if (reactorPresent) {
final String checkpoint = "@Scheduled '"+ method.getName() + "()' in bean '"
+ method.getDeclaringClass().getName() + "'";
return Flux.from(publisher).checkpoint(checkpoint);
}
else {
return publisher;
}
}
catch (InvocationTargetException ex) {
throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex.getTargetException());
}
catch (IllegalAccessException ex) {
throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex);
}
}
/**
* Create a {@link Runnable} for the Scheduled infrastructure, allowing for scheduled
* subscription to the publisher produced by a reactive method.
* <p>Note that the reactive method is invoked once, but the resulting {@code Publisher}
* is subscribed to repeatedly, once per each invocation of the {@code Runnable}.
* <p>In the case of a {@code fixed delay} configuration, the subscription inside the
* Runnable is turned into a blocking call in order to maintain fixedDelay semantics
* (i.e. the task blocks until completion of the Publisher, then the delay is applied
* until next iteration).
*/
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
List<Runnable> subscriptionTrackerRegistry) {
boolean shouldBlock = scheduled.fixedDelay() > 0 || StringUtils.hasText(scheduled.fixedDelayString());
final Publisher<?> publisher = getPublisherFor(method, targetBean);
return new SubscribingRunnable(publisher, shouldBlock, subscriptionTrackerRegistry);
}
/**
* Utility implementation of {@code Runnable} that subscribes to a {@code Publisher}
* or subscribes-then-blocks if {@code shouldBlock} is set to {@code true}.
*/
static final class SubscribingRunnable implements Runnable {
final Publisher<?> publisher;
final boolean shouldBlock;
final List<Runnable> subscriptionTrackerRegistry;
SubscribingRunnable(Publisher<?> publisher, boolean shouldBlock, List<Runnable> subscriptionTrackerRegistry) {
this.publisher = publisher;
this.shouldBlock = shouldBlock;
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
}
@Override
public void run() {
if (this.shouldBlock) {
final CountDownLatch latch = new CountDownLatch(1);
TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry, latch);
this.subscriptionTrackerRegistry.add(subscriber);
this.publisher.subscribe(subscriber);
try {
latch.await();
}
catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
else {
final TrackingSubscriber subscriber = new TrackingSubscriber(this.subscriptionTrackerRegistry);
this.subscriptionTrackerRegistry.add(subscriber);
this.publisher.subscribe(subscriber);
}
}
}
/**
* A {@code Subscriber} which keeps track of its {@code Subscription} and exposes the
* capacity to cancel the subscription as a {@code Runnable}. Can optionally support
* blocking if a {@code CountDownLatch} is passed at construction.
*/
private static final class TrackingSubscriber implements Subscriber<Object>, Runnable {
private final List<Runnable> subscriptionTrackerRegistry;
@Nullable
private final CountDownLatch blockingLatch;
/*
Implementation note: since this is created last minute when subscribing,
there shouldn't be a way to cancel the tracker externally from the
ScheduledAnnotationBeanProcessor before the #setSubscription(Subscription)
method is called.
*/
@Nullable
private Subscription s;
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry) {
this(subscriptionTrackerRegistry, null);
}
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry, @Nullable CountDownLatch latch) {
this.subscriptionTrackerRegistry = subscriptionTrackerRegistry;
this.blockingLatch = latch;
}
@Override
public void run() {
if (this.s != null) {
this.s.cancel();
}
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
}
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
// NO-OP
}
@Override
public void onError(Throwable ex) {
this.subscriptionTrackerRegistry.remove(this);
LOGGER.warn("Unexpected error occurred in scheduled reactive task", ex);
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
}
}
@Override
public void onComplete() {
this.subscriptionTrackerRegistry.remove(this);
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
}
}
}
}