diff --git a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java index 5732f8432e..d54f659165 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/annotation/ScheduledAnnotationBeanPostProcessor.java @@ -74,6 +74,7 @@ import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.scheduling.support.CronTrigger; import org.springframework.scheduling.support.ScheduledMethodRunnable; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; import org.springframework.util.StringValueResolver; @@ -122,6 +123,12 @@ public class ScheduledAnnotationBeanPostProcessor public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; + /** + * Reactive Streams API present on the classpath? + */ + private static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", ScheduledAnnotationBeanPostProcessor.class.getClassLoader()); + protected final Log logger = LogFactory.getLog(getClass()); private final ScheduledTaskRegistrar registrar; @@ -402,13 +409,63 @@ public class ScheduledAnnotationBeanPostProcessor protected void processScheduled(Scheduled scheduled, Method method, Object bean) { // Is the method a Kotlin suspending function? Throws if true and the reactor bridge isn't on the classpath. // Does the method return a reactive type? Throws if true and it isn't a deferred Publisher type. - if (ScheduledAnnotationReactiveSupport.isReactive(method)) { + if (reactiveStreamsPresent && ScheduledAnnotationReactiveSupport.isReactive(method)) { processScheduledAsync(scheduled, method, bean); return; } processScheduledSync(scheduled, method, bean); } + /** + * Process the given {@code @Scheduled} method declaration on the given bean, + * as a synchronous method. The method must accept no arguments. Its return value + * is ignored (if any), and the scheduled invocations of the method take place + * using the underlying {@link TaskScheduler} infrastructure. + * @param scheduled the {@code @Scheduled} annotation + * @param method the method that the annotation has been declared on + * @param bean the target bean instance + * @see #createRunnable(Object, Method) + */ + private void processScheduledSync(Scheduled scheduled, Method method, Object bean) { + Runnable task; + try { + task = createRunnable(bean, method); + } + catch (IllegalArgumentException ex) { + throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + + method.getName() + "': " + ex.getMessage()); + } + processScheduledTask(scheduled, task, method, bean); + } + + /** + * Process the given {@code @Scheduled} bean method declaration which returns + * a {@code Publisher}, or the given Kotlin suspending function converted to a + * {@code Publisher}. A {@code Runnable} which subscribes to that publisher is + * then repeatedly scheduled according to the annotation configuration. + *

Note that for fixed delay configuration, the subscription is turned into a blocking + * call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot + * be deferred (i.e. not a {@code Publisher}) are not supported. + * @param scheduled the {@code @Scheduled} annotation + * @param method the method that the annotation has been declared on, which + * must either return a Publisher-adaptable type or be a Kotlin suspending function + * @param bean the target bean instance + * @see ScheduledAnnotationReactiveSupport + */ + private void processScheduledAsync(Scheduled scheduled, Method method, Object bean) { + Runnable task; + try { + task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled, + this.registrar::getObservationRegistry, + this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>())); + } + catch (IllegalArgumentException ex) { + throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + + method.getName() + "': " + ex.getMessage()); + } + processScheduledTask(scheduled, task, method, bean); + } + /** * Parse the {@code Scheduled} annotation and schedule the provided {@code Runnable} * accordingly. The Runnable can represent either a synchronous method invocation @@ -419,7 +476,7 @@ public class ScheduledAnnotationBeanPostProcessor * @param method the method that the annotation has been declared on * @param bean the target bean instance */ - protected void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) { + private void processScheduledTask(Scheduled scheduled, Runnable runnable, Method method, Object bean) { try { boolean processedSchedule = false; String errorMessage = @@ -543,54 +600,6 @@ public class ScheduledAnnotationBeanPostProcessor } } - /** - * Process the given {@code @Scheduled} method declaration on the given bean, - * as a synchronous method. The method must accept no arguments. Its return value - * is ignored (if any), and the scheduled invocations of the method take place - * using the underlying {@link TaskScheduler} infrastructure. - * @param scheduled the {@code @Scheduled} annotation - * @param method the method that the annotation has been declared on - * @param bean the target bean instance - * @see #createRunnable(Object, Method) - */ - protected void processScheduledSync(Scheduled scheduled, Method method, Object bean) { - Runnable task; - try { - task = createRunnable(bean, method); - } - catch (IllegalArgumentException ex) { - throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage()); - } - processScheduledTask(scheduled, task, method, bean); - } - - /** - * Process the given {@code @Scheduled} bean method declaration which returns - * a {@code Publisher}, or the given Kotlin suspending function converted to a - * {@code Publisher}. A {@code Runnable} which subscribes to that publisher is - * then repeatedly scheduled according to the annotation configuration. - *

Note that for fixed delay configuration, the subscription is turned into a blocking - * call instead. Types for which a {@code ReactiveAdapter} is registered but which cannot - * be deferred (i.e. not a {@code Publisher}) are not supported. - * @param scheduled the {@code @Scheduled} annotation - * @param method the method that the annotation has been declared on, which - * must either return a Publisher-adaptable type or be a Kotlin suspending function - * @param bean the target bean instance - * @see ScheduledAnnotationReactiveSupport - */ - protected void processScheduledAsync(Scheduled scheduled, Method method, Object bean) { - Runnable task; - try { - task = ScheduledAnnotationReactiveSupport.createSubscriptionRunnable(method, bean, scheduled, - this.registrar::getObservationRegistry, - this.reactiveSubscriptions.computeIfAbsent(bean, k -> new CopyOnWriteArrayList<>())); - } - catch (IllegalArgumentException ex) { - throw new IllegalStateException("Could not create recurring task for @Scheduled method '" + method.getName() + "': " + ex.getMessage()); - } - processScheduledTask(scheduled, task, method, bean); - } - /** * Create a {@link Runnable} for the given bean instance, * calling the specified scheduled method. diff --git a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java index e1e95bae23..f1ad6ab709 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java +++ b/spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java @@ -107,18 +107,18 @@ public abstract class TransactionAspectSupport implements BeanFactoryAware, Init private static final String COROUTINES_FLOW_CLASS_NAME = "kotlinx.coroutines.flow.Flow"; + /** + * Reactive Streams API present on the classpath? + */ + private static final boolean reactiveStreamsPresent = ClassUtils.isPresent( + "org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader()); + /** * Vavr library present on the classpath? */ private static final boolean vavrPresent = ClassUtils.isPresent( "io.vavr.control.Try", TransactionAspectSupport.class.getClassLoader()); - /** - * Reactive Streams API present on the classpath? - */ - private static final boolean reactiveStreamsPresent = - ClassUtils.isPresent("org.reactivestreams.Publisher", TransactionAspectSupport.class.getClassLoader()); - /** * Holder to support the {@code currentTransactionStatus()} method, * and to support communication between different cooperating advices