General cleanup of StreamListenerAnnotationBeanPostProcessor
- Removed BeanFactoryAware dependency - Simplified `isDeclarativeMethodParameter` and related operations
This commit is contained in:
@@ -24,16 +24,12 @@ import java.util.Map;
|
||||
import org.springframework.aop.framework.Advised;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.BeanInitializationException;
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.config.BeanExpressionContext;
|
||||
import org.springframework.beans.factory.config.BeanExpressionResolver;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
||||
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
|
||||
import org.springframework.cloud.stream.annotation.Input;
|
||||
import org.springframework.cloud.stream.annotation.Output;
|
||||
import org.springframework.cloud.stream.annotation.StreamListener;
|
||||
@@ -61,6 +57,8 @@ import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.ReflectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* {@link BeanPostProcessor} that handles {@link StreamListener} annotations found on bean
|
||||
* methods.
|
||||
@@ -70,7 +68,7 @@ import org.springframework.util.StringUtils;
|
||||
* @author Soby Chacko
|
||||
*/
|
||||
public class StreamListenerAnnotationBeanPostProcessor
|
||||
implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton {
|
||||
implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton {
|
||||
|
||||
private static final SpelExpressionParser SPEL_EXPRESSION_PARSER = new SpelExpressionParser();
|
||||
|
||||
@@ -99,8 +97,6 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
|
||||
private EvaluationContext evaluationContext;
|
||||
|
||||
private BeanFactory beanFactory;
|
||||
|
||||
private BeanExpressionResolver resolver;
|
||||
|
||||
private BeanExpressionContext expressionContext;
|
||||
@@ -108,15 +104,8 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
@Override
|
||||
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
|
||||
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
this.beanFactory = beanFactory;
|
||||
if (beanFactory instanceof ConfigurableListableBeanFactory) {
|
||||
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
|
||||
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
|
||||
}
|
||||
this.resolver = this.applicationContext.getBeanFactory().getBeanExpressionResolver();
|
||||
this.expressionContext = new BeanExpressionContext(this.applicationContext.getBeanFactory(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -125,47 +114,40 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
|
||||
public final Object postProcessAfterInitialization(Object bean, final String beanName) throws BeansException {
|
||||
Class<?> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
|
||||
Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
|
||||
for (Method method : uniqueDeclaredMethods) {
|
||||
StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
|
||||
StreamListener.class);
|
||||
StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamListener.class);
|
||||
if (streamListener != null && !method.isBridge()) {
|
||||
streamListener = postProcessAnnotation(streamListener, method);
|
||||
Assert.isTrue(method.getAnnotation(Input.class) == null,
|
||||
StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
|
||||
int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
|
||||
int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
|
||||
boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName,
|
||||
methodAnnotatedOutboundName);
|
||||
StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount,
|
||||
outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName,
|
||||
isDeclarative, streamListener.condition());
|
||||
if (!method.getReturnType().equals(Void.TYPE)) {
|
||||
if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
|
||||
if (outputAnnotationCount == 0) {
|
||||
throw new IllegalArgumentException(
|
||||
StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
|
||||
}
|
||||
Assert.isTrue((outputAnnotationCount == 1),
|
||||
StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
|
||||
}
|
||||
}
|
||||
if (isDeclarative) {
|
||||
invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName,
|
||||
methodAnnotatedOutboundName);
|
||||
}
|
||||
else {
|
||||
registerHandlerMethodOnListenedChannel(method, streamListener, bean);
|
||||
}
|
||||
Assert.isTrue(method.getAnnotation(Input.class) == null, StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
|
||||
this.doPostProcess(streamListener, method, bean);
|
||||
}
|
||||
}
|
||||
return bean;
|
||||
}
|
||||
|
||||
private void doPostProcess(StreamListener streamListener, Method method, Object bean) {
|
||||
streamListener = postProcessAnnotation(streamListener, method);
|
||||
|
||||
String methodAnnotatedInboundName = streamListener.value();
|
||||
String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
|
||||
|
||||
int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
|
||||
int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
|
||||
boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName);
|
||||
StreamListenerMethodUtils.validateStreamListenerMethod(method,
|
||||
inputAnnotationCount, outputAnnotationCount,
|
||||
methodAnnotatedInboundName, methodAnnotatedOutboundName,
|
||||
isDeclarative, streamListener.condition());
|
||||
if (isDeclarative) {
|
||||
invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, methodAnnotatedOutboundName);
|
||||
}
|
||||
else {
|
||||
registerHandlerMethodOnListenedChannel(method, streamListener, bean);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extension point, allowing subclasses to customize the {@link StreamListener}
|
||||
* annotation detected by the postprocessor.
|
||||
@@ -178,8 +160,7 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
return originalAnnotation;
|
||||
}
|
||||
|
||||
private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName,
|
||||
String methodAnnotatedOutboundName) {
|
||||
private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName, String methodAnnotatedOutboundName) {
|
||||
int methodArgumentsLength = method.getParameterTypes().length;
|
||||
for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
|
||||
MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
|
||||
@@ -187,51 +168,55 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
String inboundName = (String) AnnotationUtils
|
||||
.getValue(methodParameter.getParameterAnnotation(Input.class));
|
||||
Assert.isTrue(StringUtils.hasText(inboundName), StreamListenerErrorMessages.INVALID_INBOUND_NAME);
|
||||
Assert.isTrue(
|
||||
isDeclarativeMethodParameter(inboundName, methodParameter),
|
||||
Assert.isTrue(isDeclarativeMethodParameter(inboundName, methodParameter),
|
||||
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
|
||||
return true;
|
||||
}
|
||||
if (methodParameter.hasParameterAnnotation(Output.class)) {
|
||||
else if (methodParameter.hasParameterAnnotation(Output.class)) {
|
||||
String outboundName = (String) AnnotationUtils
|
||||
.getValue(methodParameter.getParameterAnnotation(Output.class));
|
||||
Assert.isTrue(StringUtils.hasText(outboundName), StreamListenerErrorMessages.INVALID_OUTBOUND_NAME);
|
||||
Assert.isTrue(
|
||||
isDeclarativeMethodParameter(outboundName, methodParameter),
|
||||
Assert.isTrue(isDeclarativeMethodParameter(outboundName, methodParameter),
|
||||
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
|
||||
return true;
|
||||
}
|
||||
if (StringUtils.hasText(methodAnnotatedOutboundName)) {
|
||||
else if (StringUtils.hasText(methodAnnotatedOutboundName)) {
|
||||
return isDeclarativeMethodParameter(methodAnnotatedOutboundName, methodParameter);
|
||||
}
|
||||
if (StringUtils.hasText(methodAnnotatedInboundName)) {
|
||||
else if (StringUtils.hasText(methodAnnotatedInboundName)) {
|
||||
return isDeclarativeMethodParameter(methodAnnotatedInboundName, methodParameter);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if method parameters signify an imperative or declarative listener definition.
|
||||
* <br>
|
||||
* Imperative - where handler method is invoked on each message by the handler infrastructure provided
|
||||
* by the framework
|
||||
* <br>
|
||||
* Declarative - where handler is provided by the method itself.
|
||||
* <br>
|
||||
* Declarative method parameter could either be {@link MessageChannel} or any other Object for which
|
||||
* there is a {@link StreamListenerParameterAdapter} (i.e., {@link Flux}). Declarative method is invoked only
|
||||
* once during initialization phase.
|
||||
*/
|
||||
private boolean isDeclarativeMethodParameter(String targetBeanName, MethodParameter methodParameter) {
|
||||
if (this.applicationContext.containsBean(targetBeanName)) {
|
||||
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
|
||||
if (!methodParameter.getParameterType().equals(Object.class)
|
||||
&& (methodParameter.getParameterType().isAssignableFrom(targetBeanClass) || targetBeanClass.isAssignableFrom(methodParameter.getParameterType()))) {
|
||||
return true;
|
||||
}
|
||||
else if (!this.streamListenerParameterAdapters.isEmpty()) {
|
||||
for (StreamListenerParameterAdapter<?, ?> streamListenerParameterAdapter : this.streamListenerParameterAdapters) {
|
||||
if (streamListenerParameterAdapter.supports(targetBeanClass, methodParameter)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
boolean declarative = false;
|
||||
if (!methodParameter.getParameterType().isAssignableFrom(Object.class) && this.applicationContext.containsBean(targetBeanName)) {
|
||||
declarative = MessageChannel.class.isAssignableFrom(methodParameter.getParameterType());
|
||||
if (!declarative) {
|
||||
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
|
||||
declarative = this.streamListenerParameterAdapters.stream()
|
||||
.filter(slpa -> slpa.supports(targetBeanClass, methodParameter)).findFirst().isPresent();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
return declarative;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName,
|
||||
String outboundName) {
|
||||
private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName, String outboundName) {
|
||||
Object[] arguments = new Object[method.getParameterTypes().length];
|
||||
for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) {
|
||||
MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
|
||||
@@ -295,8 +280,7 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
}
|
||||
}
|
||||
|
||||
protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener,
|
||||
Object bean) {
|
||||
protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) {
|
||||
Assert.hasText(streamListener.value(), "The binding name cannot be null");
|
||||
if (!StringUtils.hasText(streamListener.value())) {
|
||||
throw new BeanInitializationException("A bound component name must be specified");
|
||||
@@ -408,8 +392,8 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
return (String) resolved;
|
||||
}
|
||||
else {
|
||||
throw new IllegalStateException(
|
||||
"Resolved " + property + " to [" + resolved.getClass() + "] instead of String for [" + value + "]");
|
||||
throw new IllegalStateException("Resolved " + property + " to [" + resolved.getClass()
|
||||
+ "] instead of String for [" + value + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -430,26 +414,12 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
}
|
||||
}
|
||||
|
||||
private Object resolveExpression(String value) {
|
||||
String resolvedValue = resolve(value);
|
||||
|
||||
if (!(resolvedValue.startsWith("#{") && value.endsWith("}"))) {
|
||||
return resolvedValue;
|
||||
private String resolveExpression(String value) {
|
||||
String resolvedValue = this.applicationContext.getBeanFactory().resolveEmbeddedValue(value);
|
||||
if (resolvedValue.startsWith("#{") && value.endsWith("}")) {
|
||||
resolvedValue = (String) this.resolver.evaluate(resolvedValue, this.expressionContext);
|
||||
}
|
||||
|
||||
return this.resolver.evaluate(resolvedValue, this.expressionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve the specified value if possible.
|
||||
*
|
||||
* @see ConfigurableBeanFactory#resolveEmbeddedValue
|
||||
*/
|
||||
private String resolve(String value) {
|
||||
if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
|
||||
return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
|
||||
}
|
||||
return value;
|
||||
return resolvedValue;
|
||||
}
|
||||
|
||||
private class StreamListenerHandlerMethodMapping {
|
||||
@@ -492,6 +462,5 @@ public class StreamListenerAnnotationBeanPostProcessor
|
||||
public String getCopyHeaders() {
|
||||
return this.copyHeaders;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +107,15 @@ public class StreamListenerMethodUtils {
|
||||
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
|
||||
}
|
||||
}
|
||||
|
||||
if (!method.getReturnType().equals(Void.TYPE)) {
|
||||
if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
|
||||
if (outputAnnotationCount == 0) {
|
||||
throw new IllegalArgumentException(StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
|
||||
}
|
||||
Assert.isTrue((outputAnnotationCount == 1), StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected static void validateStreamListenerMessageHandler(Method method) {
|
||||
|
||||
Reference in New Issue
Block a user