diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java index 0e1f8f9ab..abe449fc2 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/StreamListenerHandlerMethodTests.java @@ -63,6 +63,7 @@ import static org.springframework.cloud.stream.binding.StreamListenerErrorMessag /** * @author Marius Bogoevici * @author Ilayaperumal Gopinathan + * @author Gary Russell */ public class StreamListenerHandlerMethodTests { @@ -82,8 +83,6 @@ public class StreamListenerHandlerMethodTests { ConfigurableApplicationContext context = SpringApplication.run(TestMethodWithObjectAsMethodArgument.class, "--server.port=0"); Processor processor = context.getBean(Processor.class); - String id = UUID.randomUUID().toString(); - final CountDownLatch latch = new CountDownLatch(1); final String testMessage = "testing"; processor.input().send(MessageBuilder.withPayload(testMessage).build()); MessageCollector messageCollector = context.getBean(MessageCollector.class); @@ -93,6 +92,40 @@ public class StreamListenerHandlerMethodTests { context.close(); } + @Test + public void testMethodHeadersPropagatged() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestMethodHeadersPropagated.class, + "--server.port=0"); + Processor processor = context.getBean(Processor.class); + final String testMessage = "testing"; + processor.input().send(MessageBuilder.withPayload(testMessage) + .setHeader("foo", "bar") + .build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isEqualTo(testMessage.toUpperCase()); + assertThat(result.getHeaders().get("foo")).isEqualTo("bar"); + context.close(); + } + + @Test + public void testMethodHeadersNotPropagatged() throws Exception { + ConfigurableApplicationContext context = SpringApplication.run(TestMethodHeadersNotPropagated.class, + "--server.port=0"); + Processor processor = context.getBean(Processor.class); + final String testMessage = "testing"; + processor.input().send(MessageBuilder.withPayload(testMessage) + .setHeader("foo", "bar") + .build()); + MessageCollector messageCollector = context.getBean(MessageCollector.class); + Message result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS); + assertThat(result).isNotNull(); + assertThat(result.getPayload()).isEqualTo(testMessage.toUpperCase()); + assertThat(result.getHeaders().get("foo")).isNull(); + context.close(); + } + @Test public void testStreamListenerMethodWithTargetBeanFromOutside() throws Exception { ConfigurableApplicationContext context = SpringApplication @@ -333,6 +366,30 @@ public class StreamListenerHandlerMethodTests { } } + @EnableBinding({ Processor.class }) + @EnableAutoConfiguration + public static class TestMethodHeadersPropagated { + + @StreamListener(Processor.INPUT) + @SendTo(Processor.OUTPUT) + public String receive(String received) { + return received.toUpperCase(); + } + + } + + @EnableBinding({ Processor.class }) + @EnableAutoConfiguration + public static class TestMethodHeadersNotPropagated { + + @StreamListener(value = Processor.INPUT, copyHeaders = "${foo.bar:false}") + @SendTo(Processor.OUTPUT) + public String receive(String received) { + return received.toUpperCase(); + } + + } + @EnableBinding(Sink.class) @EnableAutoConfiguration public static class TestStreamListenerMethodWithTargetBeanFromOutside { diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java index fcd825cde..75d274b9a 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/annotation/StreamListener.java @@ -39,10 +39,10 @@ import org.springframework.messaging.handler.annotation.MessageMapping; * A method is considered declarative if all its method parameter types and return type * (if not void) are binding targets or conversion targets from binding targets via a * registered {@link StreamListenerParameterAdapter}. - * + * * Only declarative methods can have binding targets or conversion targets as arguments * and return type. - * + * * Declarative methods must specify what inputs and outputs correspond to their arguments * and return type, and can do this in one of the following ways. * @@ -118,6 +118,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping; * * @author Marius Bogoevici * @author Ilayaperumal Gopinathan + * @author Gary Russell * @see MessageMapping * @see EnableBinding * @see org.springframework.messaging.handler.annotation.SendTo @@ -147,4 +148,24 @@ public @interface StreamListener { * @return a SpEL expression that must evaluate to a {@code boolean} value. */ String condition() default ""; + + /** + * When "true" (default), and a {@code @SendTo} annotation is present, copy the + * inbound headers to the outbound message (if the header is absent on the outbound + * message). Can be an expression ({@code #{...}}) or property placeholder. Must + * resolve to a boolean or a string that is parsed by {@code Boolean.parseBoolean()}. + * An expression that resolves to {@code null} is interpreted to mean {@code false}. + * + * The expression is evaluated during application initialization, and not for each + * individual message. + * + * Prior to version 1.3.0, the default value used to be "false" and headers were + * not propagated by default. + * + * Starting with version 1.3.0, the default value is "true". + * + * @since 1.2.3 + */ + String copyHeaders() default "true"; + } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java index f3f62c2a6..3ad46fe15 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerAnnotationBeanPostProcessor.java @@ -40,6 +40,7 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.config.SpringIntegrationProperties; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ConfigurableApplicationContext; @@ -90,6 +91,9 @@ public class StreamListenerAnnotationBeanPostProcessor @Lazy private MessageHandlerMethodFactory messageHandlerMethodFactory; + @Autowired + private SpringIntegrationProperties springIntegrationProperties; + private ConfigurableApplicationContext applicationContext; private EvaluationContext evaluationContext; @@ -101,7 +105,6 @@ public class StreamListenerAnnotationBeanPostProcessor private BeanExpressionContext expressionContext; @Override - @SuppressWarnings({ "rawtypes", "unchecked" }) public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = (ConfigurableApplicationContext) applicationContext; } @@ -115,6 +118,7 @@ public class StreamListenerAnnotationBeanPostProcessor } } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void afterPropertiesSet() throws Exception { Map parameterAdapterMap = BeanFactoryUtils @@ -226,7 +230,7 @@ public class StreamListenerAnnotationBeanPostProcessor private boolean isDeclarativeMethodParameter(String targetBeanName, MethodParameter methodParameter) { try { - Class targetBeanClass = this.applicationContext.getType(targetBeanName); + Class targetBeanClass = this.applicationContext.getType(targetBeanName); if (!methodParameter.getParameterType().equals(Object.class) && (targetBeanClass.isAssignableFrom(methodParameter.getParameterType()) || methodParameter.getParameterType().isAssignableFrom(targetBeanClass))) { @@ -335,7 +339,8 @@ public class StreamListenerAnnotationBeanPostProcessor } StreamListenerMethodUtils.validateStreamListenerMessageHandler(method); mappedListenerMethods.add(streamListener.value(), - new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel)); + new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel, + streamListener.copyHeaders())); } @Override @@ -349,7 +354,8 @@ public class StreamListenerAnnotationBeanPostProcessor .createInvocableHandlerMethod(mapping.getTargetBean(), checkProxy(mapping.getMethod(), mapping.getTargetBean())); StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler( - invocableHandlerMethod); + invocableHandlerMethod, resolveExpressionAsBoolean(mapping.getCopyHeaders(), "copyHeaders"), + springIntegrationProperties.getMessageHandlerNotPropagatedHeaders()); streamListenerMessageHandler.setApplicationContext(this.applicationContext); streamListenerMessageHandler.setBeanFactory(this.applicationContext.getBeanFactory()); if (StringUtils.hasText(mapping.getDefaultOutputChannel())) { @@ -357,7 +363,7 @@ public class StreamListenerAnnotationBeanPostProcessor } streamListenerMessageHandler.afterPropertiesSet(); if (StringUtils.hasText(mapping.getCondition())) { - String conditionAsString = resolveExpressionAsString(mapping.getCondition()); + String conditionAsString = resolveExpressionAsString(mapping.getCondition(), "condition"); Expression condition = SPEL_EXPRESSION_PARSER.parseExpression(conditionAsString); handlers.add( new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper( @@ -423,13 +429,31 @@ public class StreamListenerAnnotationBeanPostProcessor return method; } - private String resolveExpressionAsString(String value) { + private String resolveExpressionAsString(String value, String property) { Object resolved = resolveExpression(value); if (resolved instanceof String) { return (String) resolved; } else { - throw new IllegalStateException("Resolved to [" + resolved.getClass() + "] for [" + value + "]"); + throw new IllegalStateException( + "Resolved " + property + " to [" + resolved.getClass() + "] instead of String for [" + value + "]"); + } + } + + private boolean resolveExpressionAsBoolean(String value, String property) { + Object resolved = resolveExpression(value); + if (resolved == null) { + return false; + } + else if (resolved instanceof String) { + return Boolean.parseBoolean((String) resolved); + } + else if (resolved instanceof Boolean) { + return (Boolean) resolved; + } + else { + throw new IllegalStateException("Resolved " + property + " to [" + resolved.getClass() + + "] instead of String or Boolean for [" + value + "]"); } } @@ -457,20 +481,23 @@ public class StreamListenerAnnotationBeanPostProcessor private class StreamListenerHandlerMethodMapping { - private Object targetBean; + private final Object targetBean; - private Method method; + private final Method method; - private String condition; + private final String condition; - private String defaultOutputChannel; + private final String defaultOutputChannel; + + private final String copyHeaders; StreamListenerHandlerMethodMapping(Object targetBean, Method method, String condition, - String defaultOutputChannel) { + String defaultOutputChannel, String copyHeaders) { this.targetBean = targetBean; this.method = method; this.condition = condition; this.defaultOutputChannel = defaultOutputChannel; + this.copyHeaders = copyHeaders; } Object getTargetBean() { @@ -488,6 +515,11 @@ public class StreamListenerAnnotationBeanPostProcessor String getDefaultOutputChannel() { return defaultOutputChannel; } + + public String getCopyHeaders() { + return this.copyHeaders; + } + } } diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMessageHandler.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMessageHandler.java index 6a7d4ee8f..fcace8939 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMessageHandler.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/StreamListenerMessageHandler.java @@ -23,19 +23,26 @@ import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; /** * @author Marius Bogoevici + * @author Gary Russell * @since 1.2 */ public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler { private final InvocableHandlerMethod invocableHandlerMethod; - StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod) { + private final boolean copyHeaders; + + StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod, boolean copyHeaders, + String[] notPropagatedHeaders) { + super(); this.invocableHandlerMethod = invocableHandlerMethod; + this.copyHeaders = copyHeaders; + this.setNotPropagatedHeaders(notPropagatedHeaders); } @Override protected boolean shouldCopyRequestHeaders() { - return false; + return this.copyHeaders; } public boolean isVoid() {