Polishing

This commit is contained in:
Sam Brannen
2023-06-06 11:29:25 +02:00
parent c0cd55a8fa
commit 05eab703cc
9 changed files with 126 additions and 127 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -32,28 +32,28 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar;
* {@link #cron}, {@link #fixedDelay}, or {@link #fixedRate} attributes must be
* specified.
*
* <p>The annotated method must expect no arguments. It will typically have
* <p>The annotated method must not accept arguments. It will typically have
* a {@code void} return type; if not, the returned value will be ignored
* when called through the scheduler.
*
* <p>Methods that return a reactive {@code Publisher} or a type which can be adapted
* to {@code Publisher} by the default {@code ReactiveAdapterRegistry} are supported.
* The {@code Publisher} MUST support multiple subsequent subscriptions (i.e. be cold).
* The returned Publisher is only produced once, and the scheduling infrastructure
* then periodically {@code subscribe()} to it according to configuration.
* Values emitted by the publisher are ignored. Errors are logged at WARN level, which
* doesn't prevent further iterations. If a {@code fixed delay} is configured, the
* subscription is blocked upon in order to respect the fixed delay semantics.
* The {@code Publisher} must support multiple subsequent subscriptions (i.e. be cold).
* The returned {@code Publisher} is only produced once, and the scheduling infrastructure
* then periodically subscribes to it according to configuration. Values emitted by
* the publisher are ignored. Errors are logged at {@code WARN} level, which
* doesn't prevent further iterations. If a fixed delay is configured, the
* subscription is blocked in order to respect the fixed delay semantics.
*
* <p>Kotlin suspending functions are also supported, provided the coroutine-reactor
* bridge ({@code kotlinx.coroutine.reactor}) is present at runtime. This bridge is
* used to adapt the suspending function into a {@code Publisher} which is treated
* used to adapt the suspending function to a {@code Publisher} which is treated
* the same way as in the reactive method case (see above).
*
* <p>Processing of {@code @Scheduled} annotations is performed by
* registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
* done manually or, more conveniently, through the {@code <task:annotation-driven/>}
* XML element or {@link EnableScheduling @EnableScheduling} annotation.
* <p>Processing of {@code @Scheduled} annotations is performed by registering a
* {@link ScheduledAnnotationBeanPostProcessor}. This can be done manually or,
* more conveniently, through the {@code <task:annotation-driven/>} XML element
* or {@link EnableScheduling @EnableScheduling} annotation.
*
* <p>This annotation can be used as a <em>{@linkplain Repeatable repeatable}</em>
* annotation.

View File

@@ -84,7 +84,7 @@ import org.springframework.util.StringValueResolver;
* "fixedRate", "fixedDelay", or "cron" expression provided via the annotation.
*
* <p>This post-processor is automatically registered by Spring's
* {@code <task:annotation-driven>} XML element, and also by the
* {@code <task:annotation-driven>} XML element and also by the
* {@link EnableScheduling @EnableScheduling} annotation.
*
* <p>Autodetects any {@link SchedulingConfigurer} instances in the container,
@@ -390,8 +390,9 @@ public class ScheduledAnnotationBeanPostProcessor
/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* attempting to distinguish {@link #processScheduledAsync(Scheduled, Method, Object) reactive}
* methods from {@link #processScheduledSync(Scheduled, Method, Object) synchronous} methods.
* attempting to distinguish {@linkplain #processScheduledAsync(Scheduled, Method, Object)
* reactive} methods from {@linkplain #processScheduledSync(Scheduled, Method, Object)
* synchronous} methods.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
* @param bean the target bean instance
@@ -399,8 +400,8 @@ public class ScheduledAnnotationBeanPostProcessor
* @see #processScheduledAsync(Scheduled, Method, Object)
*/
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
// Is method a Kotlin suspending function? Throws if true but reactor bridge isn't on the classpath.
// Is method returning a reactive type? Throws if true, but it isn't a deferred Publisher type.
// Is the method a Kotlin suspending function? Throws if true and the reactor bridge isn't on the classpath.
// Does the method return a reactive type? Throws if true and it isn't a deferred Publisher type.
if (ScheduledAnnotationReactiveSupport.isReactive(method)) {
processScheduledAsync(scheduled, method, bean);
return;
@@ -540,8 +541,8 @@ public class ScheduledAnnotationBeanPostProcessor
/**
* Process the given {@code @Scheduled} method declaration on the given bean,
* as a synchronous method. The method MUST take no arguments. Its return value
* is ignored (if any) and the scheduled invocations of the method take place
* as a synchronous method. The method must accept no arguments. Its return value
* is ignored (if any), and the scheduled invocations of the method take place
* using the underlying {@link TaskScheduler} infrastructure.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on
@@ -562,14 +563,14 @@ public class ScheduledAnnotationBeanPostProcessor
/**
* Process the given {@code @Scheduled} bean method declaration which returns
* a {@code Publisher}, or the given Kotlin suspending function converted to a
* Publisher. A {@code Runnable} which subscribes to that publisher is then repeatedly
* scheduled according to the annotation configuration.
* {@code Publisher}. A {@code Runnable} which subscribes to that publisher is
* then repeatedly scheduled according to the annotation configuration.
* <p>Note that for fixed delay configuration, the subscription is turned into a blocking
* call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot
* be deferred (i.e. not a {@code Publisher}) are not supported.
* be deferred (i.e. not a {@code Publisher}) are not supported.
* @param scheduled the {@code @Scheduled} annotation
* @param method the method that the annotation has been declared on, which
* MUST either return a Publisher-adaptable type or be a Kotlin suspending function
* must either return a Publisher-adaptable type or be a Kotlin suspending function
* @param bean the target bean instance
* @see ScheduledAnnotationReactiveSupport
*/
@@ -623,8 +624,8 @@ public class ScheduledAnnotationBeanPostProcessor
/**
* Return all currently scheduled tasks, from {@link Scheduled} methods
* as well as from programmatic {@link SchedulingConfigurer} interaction.
* <p>Note this includes upcoming scheduled subscriptions for reactive methods,
* but doesn't cover any currently active subscription for such methods.
* <p>Note that this includes upcoming scheduled subscriptions for reactive
* methods but doesn't cover any currently active subscription for such methods.
* @since 5.0.2
*/
@Override

View File

@@ -40,8 +40,9 @@ import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
/**
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive cases
* without a dependency on optional classes.
* Helper class for @{@link ScheduledAnnotationBeanPostProcessor} to support reactive
* cases without a dependency on optional classes.
*
* @author Simon Baslé
* @since 6.1.0
*/
@@ -58,23 +59,23 @@ abstract class ScheduledAnnotationReactiveSupport {
/**
* Checks that if the method is reactive, it can be scheduled. Methods are considered
* eligible for reactive scheduling if they either return an instance of a type that
* can be converted to {@code Publisher} or are a Kotlin Suspending Function.
* If the method isn't matching these criteria then this check returns {@code false}.
* <p>For scheduling of Kotlin Suspending Functions, the Coroutine-Reactor bridge
* {@code kotlinx.coroutines.reactor} MUST be present at runtime (in order to invoke
* suspending functions as a {@code Publisher}).
* Provided that is the case, this method returns {@code true}. Otherwise, it throws
* an {@code IllegalStateException}.
* can be converted to {@code Publisher} or are a Kotlin suspending function.
* If the method doesn't match these criteria, this check returns {@code false}.
* <p>For scheduling of Kotlin suspending functions, the Coroutine-Reactor bridge
* {@code kotlinx.coroutines.reactor} must be present at runtime (in order to invoke
* suspending functions as a {@code Publisher}). Provided that is the case, this
* method returns {@code true}. Otherwise, it throws an {@code IllegalStateException}.
* @throws IllegalStateException if the method is reactive but Reactor and/or the
* Kotlin coroutines bridge are not present at runtime
*/
static boolean isReactive(Method method) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
//Note that suspending functions declared without args have a single Continuation parameter in reflective inspection
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be"
+ " annotated with @Scheduled if declared without arguments");
Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with"
+ " @Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
// Note that suspending functions declared without args have a single Continuation
// parameter in reflective inspection
Assert.isTrue(method.getParameterCount() == 1,"Kotlin suspending functions may only be "
+ "annotated with @Scheduled if declared without arguments");
Assert.isTrue(coroutinesReactorPresent, "Kotlin suspending functions may only be annotated with "
+ "@Scheduled if the Coroutine-Reactor bridge (kotlinx.coroutines.reactor) is present at runtime");
return true;
}
ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@@ -86,10 +87,10 @@ abstract class ScheduledAnnotationReactiveSupport {
if (candidateAdapter == null) {
return false;
}
Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with"
+ " @Scheduled if declared without arguments");
Assert.isTrue(candidateAdapter.getDescriptor().isDeferred(), "Reactive methods may only be annotated with"
+ " @Scheduled if the return type supports deferred execution");
Assert.isTrue(method.getParameterCount() == 0, "Reactive methods may only be annotated with "
+ "@Scheduled if declared without arguments");
Assert.isTrue(candidateAdapter.getDescriptor().isDeferred(), "Reactive methods may only be annotated with "
+ "@Scheduled if the return type supports deferred execution");
return true;
}
@@ -98,9 +99,9 @@ abstract class ScheduledAnnotationReactiveSupport {
* either by reflectively invoking it and converting the result to a {@code Publisher}
* via {@link ReactiveAdapterRegistry} or by converting a Kotlin suspending function
* into a {@code Publisher} via {@link CoroutinesUtils}.
* The {@link #isReactive(Method)} check is a precondition to calling this method.
* If Reactor is present at runtime, the Publisher is additionally converted to a {@code Flux}
* with a checkpoint String, allowing for better debugging.
* <p>The {@link #isReactive(Method)} check is a precondition to calling this method.
* If Reactor is present at runtime, the {@code Publisher} is additionally converted
* to a {@code Flux} with a checkpoint String, allowing for better debugging.
*/
static Publisher<?> getPublisherFor(Method method, Object bean) {
if (KotlinDetector.isKotlinPresent() && KotlinDetector.isSuspendingFunction(method)) {
@@ -114,7 +115,7 @@ abstract class ScheduledAnnotationReactiveSupport {
throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher");
}
if (!adapter.getDescriptor().isDeferred()) {
throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher, "
throw new IllegalArgumentException("Cannot convert the @Scheduled reactive method return type to Publisher: "
+ returnType.getSimpleName() + " is not a deferred reactive type");
}
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
@@ -123,7 +124,7 @@ abstract class ScheduledAnnotationReactiveSupport {
Object r = invocableMethod.invoke(bean);
Publisher<?> publisher = adapter.toPublisher(r);
//if Reactor is on the classpath, we could benefit from having a checkpoint for debuggability
// If Reactor is on the classpath, we could benefit from having a checkpoint for debuggability
if (reactorPresent) {
final String checkpoint = "@Scheduled '"+ method.getName() + "()' in bean '"
+ method.getDeclaringClass().getName() + "'";
@@ -134,10 +135,13 @@ abstract class ScheduledAnnotationReactiveSupport {
}
}
catch (InvocationTargetException ex) {
throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex.getTargetException());
throw new IllegalArgumentException(
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method",
ex.getTargetException());
}
catch (IllegalAccessException ex) {
throw new IllegalArgumentException("Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex);
throw new IllegalArgumentException(
"Cannot obtain a Publisher-convertible value from the @Scheduled reactive method", ex);
}
}
@@ -146,10 +150,10 @@ abstract class ScheduledAnnotationReactiveSupport {
* subscription to the publisher produced by a reactive method.
* <p>Note that the reactive method is invoked once, but the resulting {@code Publisher}
* is subscribed to repeatedly, once per each invocation of the {@code Runnable}.
* <p>In the case of a {@code fixed delay} configuration, the subscription inside the
* Runnable is turned into a blocking call in order to maintain fixedDelay semantics
* (i.e. the task blocks until completion of the Publisher, then the delay is applied
* until next iteration).
* <p>In the case of a fixed-delay configuration, the subscription inside the
* {@link Runnable} is turned into a blocking call in order to maintain fixed-delay
* semantics (i.e. the task blocks until completion of the Publisher, and the
* delay is applied until the next iteration).
*/
static Runnable createSubscriptionRunnable(Method method, Object targetBean, Scheduled scheduled,
List<Runnable> subscriptionTrackerRegistry) {
@@ -199,7 +203,7 @@ abstract class ScheduledAnnotationReactiveSupport {
/**
* A {@code Subscriber} which keeps track of its {@code Subscription} and exposes the
* capacity to cancel the subscription as a {@code Runnable}. Can optionally support
* blocking if a {@code CountDownLatch} is passed at construction.
* blocking if a {@code CountDownLatch} is supplied during construction.
*/
private static final class TrackingSubscriber implements Subscriber<Object>, Runnable {
@@ -208,14 +212,12 @@ abstract class ScheduledAnnotationReactiveSupport {
@Nullable
private final CountDownLatch blockingLatch;
/*
Implementation note: since this is created last minute when subscribing,
there shouldn't be a way to cancel the tracker externally from the
ScheduledAnnotationBeanProcessor before the #setSubscription(Subscription)
method is called.
*/
// Implementation note: since this is created last-minute when subscribing,
// there shouldn't be a way to cancel the tracker externally from the
// ScheduledAnnotationBeanProcessor before the #setSubscription(Subscription)
// method is called.
@Nullable
private Subscription s;
private Subscription subscription;
TrackingSubscriber(List<Runnable> subscriptionTrackerRegistry) {
this(subscriptionTrackerRegistry, null);
@@ -228,8 +230,8 @@ abstract class ScheduledAnnotationReactiveSupport {
@Override
public void run() {
if (this.s != null) {
this.s.cancel();
if (this.subscription != null) {
this.subscription.cancel();
}
if (this.blockingLatch != null) {
this.blockingLatch.countDown();
@@ -237,13 +239,13 @@ abstract class ScheduledAnnotationReactiveSupport {
}
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(Integer.MAX_VALUE);
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
public void onNext(Object obj) {
// NO-OP
}