diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
index 2caf3df51..8622cfe86 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java
@@ -24,16 +24,12 @@ import java.util.Map;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.BeanFactory;
-import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.BeanPostProcessor;
-import org.springframework.beans.factory.config.ConfigurableBeanFactory;
-import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
@@ -61,6 +57,8 @@ import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
+
+
/**
* {@link BeanPostProcessor} that handles {@link StreamListener} annotations found on bean
* methods.
@@ -70,7 +68,7 @@ import org.springframework.util.StringUtils;
* @author Soby Chacko
*/
public class StreamListenerAnnotationBeanPostProcessor
- implements BeanPostProcessor, ApplicationContextAware, BeanFactoryAware, SmartInitializingSingleton {
+ implements BeanPostProcessor, ApplicationContextAware, SmartInitializingSingleton {
private static final SpelExpressionParser SPEL_EXPRESSION_PARSER = new SpelExpressionParser();
@@ -99,8 +97,6 @@ public class StreamListenerAnnotationBeanPostProcessor
private EvaluationContext evaluationContext;
- private BeanFactory beanFactory;
-
private BeanExpressionResolver resolver;
private BeanExpressionContext expressionContext;
@@ -108,15 +104,8 @@ public class StreamListenerAnnotationBeanPostProcessor
@Override
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
- }
-
- @Override
- public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
- this.beanFactory = beanFactory;
- if (beanFactory instanceof ConfigurableListableBeanFactory) {
- this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
- this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
- }
+ this.resolver = this.applicationContext.getBeanFactory().getBeanExpressionResolver();
+ this.expressionContext = new BeanExpressionContext(this.applicationContext.getBeanFactory(), null);
}
@Override
@@ -125,47 +114,40 @@ public class StreamListenerAnnotationBeanPostProcessor
}
@Override
- public final Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
+ public final Object postProcessAfterInitialization(Object bean, final String beanName) throws BeansException {
Class> targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass();
Method[] uniqueDeclaredMethods = ReflectionUtils.getUniqueDeclaredMethods(targetClass);
for (Method method : uniqueDeclaredMethods) {
- StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method,
- StreamListener.class);
+ StreamListener streamListener = AnnotatedElementUtils.findMergedAnnotation(method, StreamListener.class);
if (streamListener != null && !method.isBridge()) {
- streamListener = postProcessAnnotation(streamListener, method);
- Assert.isTrue(method.getAnnotation(Input.class) == null,
- StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
- String methodAnnotatedInboundName = streamListener.value();
- String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
- int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
- int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
- boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName,
- methodAnnotatedOutboundName);
- StreamListenerMethodUtils.validateStreamListenerMethod(method, inputAnnotationCount,
- outputAnnotationCount, methodAnnotatedInboundName, methodAnnotatedOutboundName,
- isDeclarative, streamListener.condition());
- if (!method.getReturnType().equals(Void.TYPE)) {
- if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
- if (outputAnnotationCount == 0) {
- throw new IllegalArgumentException(
- StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
- }
- Assert.isTrue((outputAnnotationCount == 1),
- StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
- }
- }
- if (isDeclarative) {
- invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName,
- methodAnnotatedOutboundName);
- }
- else {
- registerHandlerMethodOnListenedChannel(method, streamListener, bean);
- }
+ Assert.isTrue(method.getAnnotation(Input.class) == null, StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
+ this.doPostProcess(streamListener, method, bean);
}
}
return bean;
}
+ private void doPostProcess(StreamListener streamListener, Method method, Object bean) {
+ streamListener = postProcessAnnotation(streamListener, method);
+
+ String methodAnnotatedInboundName = streamListener.value();
+ String methodAnnotatedOutboundName = StreamListenerMethodUtils.getOutboundBindingTargetName(method);
+
+ int inputAnnotationCount = StreamListenerMethodUtils.inputAnnotationCount(method);
+ int outputAnnotationCount = StreamListenerMethodUtils.outputAnnotationCount(method);
+ boolean isDeclarative = checkDeclarativeMethod(method, methodAnnotatedInboundName, methodAnnotatedOutboundName);
+ StreamListenerMethodUtils.validateStreamListenerMethod(method,
+ inputAnnotationCount, outputAnnotationCount,
+ methodAnnotatedInboundName, methodAnnotatedOutboundName,
+ isDeclarative, streamListener.condition());
+ if (isDeclarative) {
+ invokeSetupMethodOnListenedChannel(method, bean, methodAnnotatedInboundName, methodAnnotatedOutboundName);
+ }
+ else {
+ registerHandlerMethodOnListenedChannel(method, streamListener, bean);
+ }
+ }
+
/**
* Extension point, allowing subclasses to customize the {@link StreamListener}
* annotation detected by the postprocessor.
@@ -178,8 +160,7 @@ public class StreamListenerAnnotationBeanPostProcessor
return originalAnnotation;
}
- private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName,
- String methodAnnotatedOutboundName) {
+ private boolean checkDeclarativeMethod(Method method, String methodAnnotatedInboundName, String methodAnnotatedOutboundName) {
int methodArgumentsLength = method.getParameterTypes().length;
for (int parameterIndex = 0; parameterIndex < methodArgumentsLength; parameterIndex++) {
MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
@@ -187,51 +168,55 @@ public class StreamListenerAnnotationBeanPostProcessor
String inboundName = (String) AnnotationUtils
.getValue(methodParameter.getParameterAnnotation(Input.class));
Assert.isTrue(StringUtils.hasText(inboundName), StreamListenerErrorMessages.INVALID_INBOUND_NAME);
- Assert.isTrue(
- isDeclarativeMethodParameter(inboundName, methodParameter),
+ Assert.isTrue(isDeclarativeMethodParameter(inboundName, methodParameter),
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
return true;
}
- if (methodParameter.hasParameterAnnotation(Output.class)) {
+ else if (methodParameter.hasParameterAnnotation(Output.class)) {
String outboundName = (String) AnnotationUtils
.getValue(methodParameter.getParameterAnnotation(Output.class));
Assert.isTrue(StringUtils.hasText(outboundName), StreamListenerErrorMessages.INVALID_OUTBOUND_NAME);
- Assert.isTrue(
- isDeclarativeMethodParameter(outboundName, methodParameter),
+ Assert.isTrue(isDeclarativeMethodParameter(outboundName, methodParameter),
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
return true;
}
- if (StringUtils.hasText(methodAnnotatedOutboundName)) {
+ else if (StringUtils.hasText(methodAnnotatedOutboundName)) {
return isDeclarativeMethodParameter(methodAnnotatedOutboundName, methodParameter);
}
- if (StringUtils.hasText(methodAnnotatedInboundName)) {
+ else if (StringUtils.hasText(methodAnnotatedInboundName)) {
return isDeclarativeMethodParameter(methodAnnotatedInboundName, methodParameter);
}
}
return false;
}
+ /**
+ * Determines if method parameters signify an imperative or declarative listener definition.
+ *
+ * Imperative - where handler method is invoked on each message by the handler infrastructure provided
+ * by the framework
+ *
+ * Declarative - where handler is provided by the method itself.
+ *
+ * Declarative method parameter could either be {@link MessageChannel} or any other Object for which
+ * there is a {@link StreamListenerParameterAdapter} (i.e., {@link Flux}). Declarative method is invoked only
+ * once during initialization phase.
+ */
private boolean isDeclarativeMethodParameter(String targetBeanName, MethodParameter methodParameter) {
- if (this.applicationContext.containsBean(targetBeanName)) {
- Class> targetBeanClass = this.applicationContext.getType(targetBeanName);
- if (!methodParameter.getParameterType().equals(Object.class)
- && (methodParameter.getParameterType().isAssignableFrom(targetBeanClass) || targetBeanClass.isAssignableFrom(methodParameter.getParameterType()))) {
- return true;
- }
- else if (!this.streamListenerParameterAdapters.isEmpty()) {
- for (StreamListenerParameterAdapter, ?> streamListenerParameterAdapter : this.streamListenerParameterAdapters) {
- if (streamListenerParameterAdapter.supports(targetBeanClass, methodParameter)) {
- return true;
- }
- }
+ boolean declarative = false;
+ if (!methodParameter.getParameterType().isAssignableFrom(Object.class) && this.applicationContext.containsBean(targetBeanName)) {
+ declarative = MessageChannel.class.isAssignableFrom(methodParameter.getParameterType());
+ if (!declarative) {
+ Class> targetBeanClass = this.applicationContext.getType(targetBeanName);
+ declarative = this.streamListenerParameterAdapters.stream()
+ .filter(slpa -> slpa.supports(targetBeanClass, methodParameter)).findFirst().isPresent();
}
}
- return false;
+ return declarative;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
- private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName,
- String outboundName) {
+ private void invokeSetupMethodOnListenedChannel(Method method, Object bean, String inboundName, String outboundName) {
Object[] arguments = new Object[method.getParameterTypes().length];
for (int parameterIndex = 0; parameterIndex < arguments.length; parameterIndex++) {
MethodParameter methodParameter = MethodParameter.forMethodOrConstructor(method, parameterIndex);
@@ -295,8 +280,7 @@ public class StreamListenerAnnotationBeanPostProcessor
}
}
- protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener,
- Object bean) {
+ protected final void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) {
Assert.hasText(streamListener.value(), "The binding name cannot be null");
if (!StringUtils.hasText(streamListener.value())) {
throw new BeanInitializationException("A bound component name must be specified");
@@ -408,8 +392,8 @@ public class StreamListenerAnnotationBeanPostProcessor
return (String) resolved;
}
else {
- throw new IllegalStateException(
- "Resolved " + property + " to [" + resolved.getClass() + "] instead of String for [" + value + "]");
+ throw new IllegalStateException("Resolved " + property + " to [" + resolved.getClass()
+ + "] instead of String for [" + value + "]");
}
}
@@ -430,26 +414,12 @@ public class StreamListenerAnnotationBeanPostProcessor
}
}
- private Object resolveExpression(String value) {
- String resolvedValue = resolve(value);
-
- if (!(resolvedValue.startsWith("#{") && value.endsWith("}"))) {
- return resolvedValue;
+ private String resolveExpression(String value) {
+ String resolvedValue = this.applicationContext.getBeanFactory().resolveEmbeddedValue(value);
+ if (resolvedValue.startsWith("#{") && value.endsWith("}")) {
+ resolvedValue = (String) this.resolver.evaluate(resolvedValue, this.expressionContext);
}
-
- return this.resolver.evaluate(resolvedValue, this.expressionContext);
- }
-
- /**
- * Resolve the specified value if possible.
- *
- * @see ConfigurableBeanFactory#resolveEmbeddedValue
- */
- private String resolve(String value) {
- if (this.beanFactory != null && this.beanFactory instanceof ConfigurableBeanFactory) {
- return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
- }
- return value;
+ return resolvedValue;
}
private class StreamListenerHandlerMethodMapping {
@@ -492,6 +462,5 @@ public class StreamListenerAnnotationBeanPostProcessor
public String getCopyHeaders() {
return this.copyHeaders;
}
-
}
}
diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
index ef3ace67d..ff9e74c9b 100644
--- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
+++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMethodUtils.java
@@ -107,6 +107,15 @@ public class StreamListenerMethodUtils {
StreamListenerErrorMessages.INVALID_DECLARATIVE_METHOD_PARAMETERS);
}
}
+
+ if (!method.getReturnType().equals(Void.TYPE)) {
+ if (!StringUtils.hasText(methodAnnotatedOutboundName)) {
+ if (outputAnnotationCount == 0) {
+ throw new IllegalArgumentException(StreamListenerErrorMessages.RETURN_TYPE_NO_OUTBOUND_SPECIFIED);
+ }
+ Assert.isTrue((outputAnnotationCount == 1), StreamListenerErrorMessages.RETURN_TYPE_MULTIPLE_OUTBOUND_SPECIFIED);
+ }
+ }
}
protected static void validateStreamListenerMessageHandler(Method method) {