GH-1010: @StreamListener: Fix Header Propagation

Fixes #1010

Propagate headers by default.

Add a `copyHeaders` property to `@StreamListener` to allow suppression of header propagation.

Honour header propagation settings
This commit is contained in:
Gary Russell
2017-07-13 15:23:07 -04:00
committed by Marius Bogoevici
parent 2910d27e09
commit 205d4579a4
4 changed files with 135 additions and 18 deletions

View File

@@ -63,6 +63,7 @@ import static org.springframework.cloud.stream.binding.StreamListenerErrorMessag
/**
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Gary Russell
*/
public class StreamListenerHandlerMethodTests {
@@ -82,8 +83,6 @@ public class StreamListenerHandlerMethodTests {
ConfigurableApplicationContext context = SpringApplication.run(TestMethodWithObjectAsMethodArgument.class,
"--server.port=0");
Processor processor = context.getBean(Processor.class);
String id = UUID.randomUUID().toString();
final CountDownLatch latch = new CountDownLatch(1);
final String testMessage = "testing";
processor.input().send(MessageBuilder.withPayload(testMessage).build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
@@ -93,6 +92,40 @@ public class StreamListenerHandlerMethodTests {
context.close();
}
@Test
public void testMethodHeadersPropagatged() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(TestMethodHeadersPropagated.class,
"--server.port=0");
Processor processor = context.getBean(Processor.class);
final String testMessage = "testing";
processor.input().send(MessageBuilder.withPayload(testMessage)
.setHeader("foo", "bar")
.build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(testMessage.toUpperCase());
assertThat(result.getHeaders().get("foo")).isEqualTo("bar");
context.close();
}
@Test
public void testMethodHeadersNotPropagatged() throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(TestMethodHeadersNotPropagated.class,
"--server.port=0");
Processor processor = context.getBean(Processor.class);
final String testMessage = "testing";
processor.input().send(MessageBuilder.withPayload(testMessage)
.setHeader("foo", "bar")
.build());
MessageCollector messageCollector = context.getBean(MessageCollector.class);
Message<?> result = messageCollector.forChannel(processor.output()).poll(1000, TimeUnit.MILLISECONDS);
assertThat(result).isNotNull();
assertThat(result.getPayload()).isEqualTo(testMessage.toUpperCase());
assertThat(result.getHeaders().get("foo")).isNull();
context.close();
}
@Test
public void testStreamListenerMethodWithTargetBeanFromOutside() throws Exception {
ConfigurableApplicationContext context = SpringApplication
@@ -333,6 +366,30 @@ public class StreamListenerHandlerMethodTests {
}
}
@EnableBinding({ Processor.class })
@EnableAutoConfiguration
public static class TestMethodHeadersPropagated {
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String receive(String received) {
return received.toUpperCase();
}
}
@EnableBinding({ Processor.class })
@EnableAutoConfiguration
public static class TestMethodHeadersNotPropagated {
@StreamListener(value = Processor.INPUT, copyHeaders = "${foo.bar:false}")
@SendTo(Processor.OUTPUT)
public String receive(String received) {
return received.toUpperCase();
}
}
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestStreamListenerMethodWithTargetBeanFromOutside {

View File

@@ -39,10 +39,10 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
* A method is considered declarative if all its method parameter types and return type
* (if not void) are binding targets or conversion targets from binding targets via a
* registered {@link StreamListenerParameterAdapter}.
*
*
* Only declarative methods can have binding targets or conversion targets as arguments
* and return type.
*
*
* Declarative methods must specify what inputs and outputs correspond to their arguments
* and return type, and can do this in one of the following ways.
*
@@ -118,6 +118,7 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
*
* @author Marius Bogoevici
* @author Ilayaperumal Gopinathan
* @author Gary Russell
* @see MessageMapping
* @see EnableBinding
* @see org.springframework.messaging.handler.annotation.SendTo
@@ -147,4 +148,24 @@ public @interface StreamListener {
* @return a SpEL expression that must evaluate to a {@code boolean} value.
*/
String condition() default "";
/**
* When "true" (default), and a {@code @SendTo} annotation is present, copy the
* inbound headers to the outbound message (if the header is absent on the outbound
* message). Can be an expression ({@code #{...}}) or property placeholder. Must
* resolve to a boolean or a string that is parsed by {@code Boolean.parseBoolean()}.
* An expression that resolves to {@code null} is interpreted to mean {@code false}.
*
* The expression is evaluated during application initialization, and not for each
* individual message.
*
* Prior to version 1.3.0, the default value used to be "false" and headers were
* not propagated by default.
*
* Starting with version 1.3.0, the default value is "true".
*
* @since 1.2.3
*/
String copyHeaders() default "true";
}

View File

@@ -40,6 +40,7 @@ import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.config.SpringIntegrationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
@@ -90,6 +91,9 @@ public class StreamListenerAnnotationBeanPostProcessor
@Lazy
private MessageHandlerMethodFactory messageHandlerMethodFactory;
@Autowired
private SpringIntegrationProperties springIntegrationProperties;
private ConfigurableApplicationContext applicationContext;
private EvaluationContext evaluationContext;
@@ -101,7 +105,6 @@ public class StreamListenerAnnotationBeanPostProcessor
private BeanExpressionContext expressionContext;
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@@ -115,6 +118,7 @@ public class StreamListenerAnnotationBeanPostProcessor
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
public void afterPropertiesSet() throws Exception {
Map<String, StreamListenerParameterAdapter> parameterAdapterMap = BeanFactoryUtils
@@ -226,7 +230,7 @@ public class StreamListenerAnnotationBeanPostProcessor
private boolean isDeclarativeMethodParameter(String targetBeanName, MethodParameter methodParameter) {
try {
Class targetBeanClass = this.applicationContext.getType(targetBeanName);
Class<?> targetBeanClass = this.applicationContext.getType(targetBeanName);
if (!methodParameter.getParameterType().equals(Object.class)
&& (targetBeanClass.isAssignableFrom(methodParameter.getParameterType()) ||
methodParameter.getParameterType().isAssignableFrom(targetBeanClass))) {
@@ -335,7 +339,8 @@ public class StreamListenerAnnotationBeanPostProcessor
}
StreamListenerMethodUtils.validateStreamListenerMessageHandler(method);
mappedListenerMethods.add(streamListener.value(),
new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel));
new StreamListenerHandlerMethodMapping(bean, method, streamListener.condition(), defaultOutputChannel,
streamListener.copyHeaders()));
}
@Override
@@ -349,7 +354,8 @@ public class StreamListenerAnnotationBeanPostProcessor
.createInvocableHandlerMethod(mapping.getTargetBean(),
checkProxy(mapping.getMethod(), mapping.getTargetBean()));
StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler(
invocableHandlerMethod);
invocableHandlerMethod, resolveExpressionAsBoolean(mapping.getCopyHeaders(), "copyHeaders"),
springIntegrationProperties.getMessageHandlerNotPropagatedHeaders());
streamListenerMessageHandler.setApplicationContext(this.applicationContext);
streamListenerMessageHandler.setBeanFactory(this.applicationContext.getBeanFactory());
if (StringUtils.hasText(mapping.getDefaultOutputChannel())) {
@@ -357,7 +363,7 @@ public class StreamListenerAnnotationBeanPostProcessor
}
streamListenerMessageHandler.afterPropertiesSet();
if (StringUtils.hasText(mapping.getCondition())) {
String conditionAsString = resolveExpressionAsString(mapping.getCondition());
String conditionAsString = resolveExpressionAsString(mapping.getCondition(), "condition");
Expression condition = SPEL_EXPRESSION_PARSER.parseExpression(conditionAsString);
handlers.add(
new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(
@@ -423,13 +429,31 @@ public class StreamListenerAnnotationBeanPostProcessor
return method;
}
private String resolveExpressionAsString(String value) {
private String resolveExpressionAsString(String value, String property) {
Object resolved = resolveExpression(value);
if (resolved instanceof String) {
return (String) resolved;
}
else {
throw new IllegalStateException("Resolved to [" + resolved.getClass() + "] for [" + value + "]");
throw new IllegalStateException(
"Resolved " + property + " to [" + resolved.getClass() + "] instead of String for [" + value + "]");
}
}
private boolean resolveExpressionAsBoolean(String value, String property) {
Object resolved = resolveExpression(value);
if (resolved == null) {
return false;
}
else if (resolved instanceof String) {
return Boolean.parseBoolean((String) resolved);
}
else if (resolved instanceof Boolean) {
return (Boolean) resolved;
}
else {
throw new IllegalStateException("Resolved " + property + " to [" + resolved.getClass()
+ "] instead of String or Boolean for [" + value + "]");
}
}
@@ -457,20 +481,23 @@ public class StreamListenerAnnotationBeanPostProcessor
private class StreamListenerHandlerMethodMapping {
private Object targetBean;
private final Object targetBean;
private Method method;
private final Method method;
private String condition;
private final String condition;
private String defaultOutputChannel;
private final String defaultOutputChannel;
private final String copyHeaders;
StreamListenerHandlerMethodMapping(Object targetBean, Method method, String condition,
String defaultOutputChannel) {
String defaultOutputChannel, String copyHeaders) {
this.targetBean = targetBean;
this.method = method;
this.condition = condition;
this.defaultOutputChannel = defaultOutputChannel;
this.copyHeaders = copyHeaders;
}
Object getTargetBean() {
@@ -488,6 +515,11 @@ public class StreamListenerAnnotationBeanPostProcessor
String getDefaultOutputChannel() {
return defaultOutputChannel;
}
public String getCopyHeaders() {
return this.copyHeaders;
}
}
}

View File

@@ -23,19 +23,26 @@ import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
/**
* @author Marius Bogoevici
* @author Gary Russell
* @since 1.2
*/
public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {
private final InvocableHandlerMethod invocableHandlerMethod;
StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod) {
private final boolean copyHeaders;
StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod, boolean copyHeaders,
String[] notPropagatedHeaders) {
super();
this.invocableHandlerMethod = invocableHandlerMethod;
this.copyHeaders = copyHeaders;
this.setNotPropagatedHeaders(notPropagatedHeaders);
}
@Override
protected boolean shouldCopyRequestHeaders() {
return false;
return this.copyHeaders;
}
public boolean isVoid() {