Support for reactive result values from event listener methods

Closes gh-21831
This commit is contained in:
Juergen Hoeller
2019-07-05 16:19:23 +02:00
parent 97d020c509
commit 7bfe01a028
6 changed files with 169 additions and 31 deletions

View File

@@ -24,9 +24,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
@@ -34,20 +37,24 @@ import org.springframework.context.ApplicationEvent;
import org.springframework.context.PayloadApplicationEvent;
import org.springframework.context.expression.AnnotatedElementKey;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.Order;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* {@link GenericApplicationListener} adapter that delegates the processing of
* an event to an {@link EventListener} annotated method.
*
* <p>Delegates to {@link #processEvent(ApplicationEvent)} to give sub-classes
* <p>Delegates to {@link #processEvent(ApplicationEvent)} to give subclasses
* a chance to deviate from the default. Unwraps the content of a
* {@link PayloadApplicationEvent} if necessary to allow a method declaration
* to define any arbitrary event type. If a condition is defined, it is
@@ -60,6 +67,10 @@ import org.springframework.util.StringUtils;
*/
public class ApplicationListenerMethodAdapter implements GenericApplicationListener {
private static final boolean reactiveStreamsPresent = ClassUtils.isPresent(
"org.reactivestreams.Publisher", ApplicationListenerMethodAdapter.class.getClassLoader());
protected final Log logger = LogFactory.getLog(getClass());
private final String beanName;
@@ -213,6 +224,30 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
}
protected void handleResult(Object result) {
if (reactiveStreamsPresent && new ReactiveResultHandler().subscribeToPublisher(result)) {
if (logger.isTraceEnabled()) {
logger.trace("Adapted to reactive result: " + result);
}
}
else if (result instanceof CompletionStage) {
((CompletionStage<?>) result).whenComplete((event, ex) -> {
if (ex != null) {
handleAsyncError(ex);
}
else if (event != null) {
publishEvent(event);
}
});
}
else if (result instanceof ListenableFuture) {
((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError);
}
else {
publishEvents(result);
}
}
private void publishEvents(Object result) {
if (result.getClass().isArray()) {
Object[] events = ObjectUtils.toObjectArray(result);
for (Object event : events) {
@@ -237,6 +272,10 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
}
}
protected void handleAsyncError(Throwable t) {
logger.error("Unexpected error occurred in asynchronous listener", t);
}
private boolean shouldHandle(ApplicationEvent event, @Nullable Object[] args) {
if (args == null) {
return false;
@@ -376,4 +415,40 @@ public class ApplicationListenerMethodAdapter implements GenericApplicationListe
return this.method.toGenericString();
}
private class ReactiveResultHandler {
public boolean subscribeToPublisher(Object result) {
ReactiveAdapter adapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(result.getClass());
if (adapter != null) {
adapter.toPublisher(result).subscribe(new EventPublicationSubscriber());
return true;
}
return false;
}
}
private class EventPublicationSubscriber implements Subscriber<Object> {
@Override
public void onSubscribe(Subscription s) {
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Object o) {
publishEvents(o);
}
@Override
public void onError(Throwable t) {
handleAsyncError(t);
}
@Override
public void onComplete() {
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -92,9 +92,7 @@ public abstract class TaskUtils {
@Override
public void handleError(Throwable t) {
if (logger.isErrorEnabled()) {
logger.error("Unexpected error occurred in scheduled task.", t);
}
logger.error("Unexpected error occurred in scheduled task", t);
}
}