From 85ede8a3fdcec70d7971f9a17e020d38c9733979 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Sun, 3 Dec 2017 10:43:45 -0500 Subject: [PATCH] General cleanup of StreamListenerAnnotationBeanPostProcessor - Removed BeanFactoryAware dependency - Simplified `isDeclarativeMethodParameter` and related operations --- ...amListenerAnnotationBeanPostProcessor.java | 161 +++++++----------- .../binding/StreamListenerMethodUtils.java | 9 + 2 files changed, 74 insertions(+), 96 deletions(-) diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java index 2caf3df51..8622cfe86 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java @@ -24,16 +24,12 @@ import java.util.Map; import org.springframework.aop.framework.Advised; import org.springframework.aop.support.AopUtils; import org.springframework.beans.BeansException; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.BeanInitializationException; import org.springframework.beans.factory.SmartInitializingSingleton; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.config.BeanExpressionContext; import org.springframework.beans.factory.config.BeanExpressionResolver; import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; @@ -61,6 +57,8 @@ import org.springframework.util.MultiValueMap; import org.springframework.util.ReflectionUtils; import org.springframework.util.StringUtils; + + /** * {@link BeanPostProcessor} that handles {@link StreamListener} annotations found on bean * methods. @@ -70,7 +68,7 @@ import org.springframework.util.StringUtils; * @author Soby Chacko */ public class StreamListenerAnnotationBeanPostProcessor - implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton { + implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton { private static final SpelExpressionParser SPEL_EXPRESSION_PARSER = new SpelExpressionParser(); @@ -99,8 +97,6 @@ public class StreamListenerAnnotationBeanPostProcessor private EvaluationContext evaluationContext; - private BeanFactory beanFactory; - private BeanExpressionResolver resolver; private BeanExpressionContext expressionContext; @@ -108,15 +104,8 @@ public class StreamListenerAnnotationBeanPostProcessor @Override public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; - } - - @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { - this.beanFactory = beanFactory; - if (beanFactory instanceof ConfigurableListableBeanFactory) { - this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver(); - this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null); - } + this.resolver = this.applicationContext.getBeanFactory().getBeanExpressionResolver(); + this.expressionContext = new BeanExpressionContext(this.applicationContext.getBeanFactory(), null); } @Override @@ -125,47 +114,40 @@ public class StreamListenerAnnotationBeanPostProcessor } @Override - public final Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { + public final Object postProcessAfterInitialization(Object bean, final String beanName) throws BeansException { Class targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass); for (Method method : uniqueDeclaredMethods) { - StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, - StreamListener.class); + StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamListener.class); if (streamListener != null && !method.isBridge()) { - streamListener = postProcessAnnotation(streamListener, method); - Assert.isTrue(method.getAnnotation(Input.class) == null, - StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER); - String methodAnnotatedInboundName = streamListener.value(); - String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method); - int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method); - int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method); - boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, - methodAnnotatedOutboundName); - StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount, - outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName, - isDeclarative, streamListener.condition()); - if (!method.getReturnType().equals(Void.TYPE)) { - if (!StringUtils.hasText(methodAnnotatedOutboundName)) { - if (outputAnnotationCount == 0) { - throw new IllegalArgumentException( - StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED); - } - Assert.isTrue((outputAnnotationCount == 1), - StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED); - } - } - if (isDeclarative) { - invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, - methodAnnotatedOutboundName); - } - else { - registerHandlerMethodOnListenedChannel(method, streamListener, bean); - } + Assert.isTrue(method.getAnnotation(Input.class) == null, StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER); + this.doPostProcess(streamListener, method, bean); } } return bean; } + private void doPostProcess(StreamListener streamListener, Method method, Object bean) { + streamListener = postProcessAnnotation(streamListener, method); + + String methodAnnotatedInboundName = streamListener.value(); + String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method); + + int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method); + int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method); + boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName); + StreamListenerMethodUtils.validateStreamListenerMethod(method, + inputAnnotationCount, outputAnnotationCount, + methodAnnotatedInboundName, methodAnnotatedOutboundName, + isDeclarative, streamListener.condition()); + if (isDeclarative) { + invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, methodAnnotatedOutboundName); + } + else { + registerHandlerMethodOnListenedChannel(method, streamListener, bean); + } + } + /** * Extension point, allowing subclasses to customize the {@link StreamListener} * annotation detected by the postprocessor. @@ -178,8 +160,7 @@ public class StreamListenerAnnotationBeanPostProcessor return originalAnnotation; } - private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName, - String methodAnnotatedOutboundName) { + private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName, String methodAnnotatedOutboundName) { int methodArgumentsLength = method.getParameterTypes().length; for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) { MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex); @@ -187,51 +168,55 @@ public class StreamListenerAnnotationBeanPostProcessor String inboundName = (String) AnnotationUtils .getValue(methodParameter.getParameterAnnotation(Input.class)); Assert.isTrue(StringUtils.hasText(inboundName), StreamListenerErrorMessages.INVALID_INBOUND_NAME); - Assert.isTrue( - isDeclarativeMethodParameter(inboundName, methodParameter), + Assert.isTrue(isDeclarativeMethodParameter(inboundName, methodParameter), StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS); return true; } - if (methodParameter.hasParameterAnnotation(Output.class)) { + else if (methodParameter.hasParameterAnnotation(Output.class)) { String outboundName = (String) AnnotationUtils .getValue(methodParameter.getParameterAnnotation(Output.class)); Assert.isTrue(StringUtils.hasText(outboundName), StreamListenerErrorMessages.INVALID_OUTBOUND_NAME); - Assert.isTrue( - isDeclarativeMethodParameter(outboundName, methodParameter), + Assert.isTrue(isDeclarativeMethodParameter(outboundName, methodParameter), StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS); return true; } - if (StringUtils.hasText(methodAnnotatedOutboundName)) { + else if (StringUtils.hasText(methodAnnotatedOutboundName)) { return isDeclarativeMethodParameter(methodAnnotatedOutboundName, methodParameter); } - if (StringUtils.hasText(methodAnnotatedInboundName)) { + else if (StringUtils.hasText(methodAnnotatedInboundName)) { return isDeclarativeMethodParameter(methodAnnotatedInboundName, methodParameter); } } return false; } + /** + * Determines if method parameters signify an imperative or declarative listener definition. + *
+ * Imperative - where handler method is invoked on each message by the handler infrastructure provided + * by the framework + *
+ * Declarative - where handler is provided by the method itself. + *
+ * Declarative method parameter could either be {@link MessageChannel} or any other Object for which + * there is a {@link StreamListenerParameterAdapter} (i.e., {@link Flux}). Declarative method is invoked only + * once during initialization phase. + */ private boolean isDeclarativeMethodParameter(String targetBeanName, MethodParameter methodParameter) { - if (this.applicationContext.containsBean(targetBeanName)) { - Class targetBeanClass = this.applicationContext.getType(targetBeanName); - if (!methodParameter.getParameterType().equals(Object.class) - && (methodParameter.getParameterType().isAssignableFrom(targetBeanClass) || targetBeanClass.isAssignableFrom(methodParameter.getParameterType()))) { - return true; - } - else if (!this.streamListenerParameterAdapters.isEmpty()) { - for (StreamListenerParameterAdapter streamListenerParameterAdapter : this.streamListenerParameterAdapters) { - if (streamListenerParameterAdapter.supports(targetBeanClass, methodParameter)) { - return true; - } - } + boolean declarative = false; + if (!methodParameter.getParameterType().isAssignableFrom(Object.class) && this.applicationContext.containsBean(targetBeanName)) { + declarative = MessageChannel.class.isAssignableFrom(methodParameter.getParameterType()); + if (!declarative) { + Class targetBeanClass = this.applicationContext.getType(targetBeanName); + declarative = this.streamListenerParameterAdapters.stream() + .filter(slpa -> slpa.supports(targetBeanClass, methodParameter)).findFirst().isPresent(); } } - return false; + return declarative; } @SuppressWarnings({ "rawtypes", "unchecked" }) - private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName, - String outboundName) { + private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName, String outboundName) { Object[] arguments = new Object[method.getParameterTypes().length]; for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) { MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex); @@ -295,8 +280,7 @@ public class StreamListenerAnnotationBeanPostProcessor } } - protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, - Object bean) { + protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) { Assert.hasText(streamListener.value(), "The binding name cannot be null"); if (!StringUtils.hasText(streamListener.value())) { throw new BeanInitializationException("A bound component name must be specified"); @@ -408,8 +392,8 @@ public class StreamListenerAnnotationBeanPostProcessor return (String) resolved; } else { - throw new IllegalStateException( - "Resolved " + property + " to [" + resolved.getClass() + "] instead of String for [" + value + "]"); + throw new IllegalStateException("Resolved " + property + " to [" + resolved.getClass() + + "] instead of String for [" + value + "]"); } } @@ -430,26 +414,12 @@ public class StreamListenerAnnotationBeanPostProcessor } } - private Object resolveExpression(String value) { - String resolvedValue = resolve(value); - - if (!(resolvedValue.startsWith("#{") && value.endsWith("}"))) { - return resolvedValue; + private String resolveExpression(String value) { + String resolvedValue = this.applicationContext.getBeanFactory().resolveEmbeddedValue(value); + if (resolvedValue.startsWith("#{") && value.endsWith("}")) { + resolvedValue = (String) this.resolver.evaluate(resolvedValue, this.expressionContext); } - - return this.resolver.evaluate(resolvedValue, this.expressionContext); - } - - /** - * Resolve the specified value if possible. - * - * @see ConfigurableBeanFactory#resolveEmbeddedValue - */ - private String resolve(String value) { - if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) { - return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value); - } - return value; + return resolvedValue; } private class StreamListenerHandlerMethodMapping { @@ -492,6 +462,5 @@ public class StreamListenerAnnotationBeanPostProcessor public String getCopyHeaders() { return this.copyHeaders; } - } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java index ef3ace67d..ff9e74c9b 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java @@ -107,6 +107,15 @@ public class StreamListenerMethodUtils { StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS); } } + + if (!method.getReturnType().equals(Void.TYPE)) { + if (!StringUtils.hasText(methodAnnotatedOutboundName)) { + if (outputAnnotationCount == 0) { + throw new IllegalArgumentException(StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED); + } + Assert.isTrue((outputAnnotationCount == 1), StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED); + } + } } protected static void validateStreamListenerMessageHandler(Method method) {