GH-943: Don't Propagate contentType
Resolves: https://github.com/spring-cloud/spring-cloud-stream/issues/943 There is a current limitation in that the contentType is not currently stripped if the message is forward unchanged. But this is likely unusual. See https://jira.spring.io/browse/INT-4272
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.stream.config;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@@ -25,7 +26,11 @@ import java.util.Map;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.BeanFactoryUtils;
|
||||
import org.springframework.beans.factory.SmartInitializingSingleton;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.config.BeanPostProcessor;
|
||||
@@ -57,7 +62,10 @@ import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.config.IntegrationEvaluationContextFactoryBean;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.json.JsonPropertyAccessor;
|
||||
import org.springframework.integration.support.DefaultMessageBuilderFactory;
|
||||
import org.springframework.integration.support.utils.IntegrationUtils;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.core.DestinationResolutionException;
|
||||
import org.springframework.messaging.core.DestinationResolver;
|
||||
@@ -189,6 +197,48 @@ public class BindingServiceConfiguration {
|
||||
return new StreamListenerAnnotationBeanPostProcessor();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ReadOnlyHeadersAdjuster readOnlyHeadersAdjuster() {
|
||||
return new ReadOnlyHeadersAdjuster();
|
||||
}
|
||||
|
||||
public static class ReadOnlyHeadersAdjuster implements SmartInitializingSingleton, BeanFactoryAware {
|
||||
|
||||
private BeanFactory beanFactory;
|
||||
|
||||
@Override
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
this.beanFactory = beanFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterSingletonsInstantiated() {
|
||||
if (beanFactory.containsBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME)) {
|
||||
try {
|
||||
DefaultMessageBuilderFactory messageBuilderFactory = beanFactory.getBean(
|
||||
IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME,
|
||||
DefaultMessageBuilderFactory.class);
|
||||
// TODO: Add 'addReadOnlyHeader' to builder - INT-4272
|
||||
String[] readOnlyHeaders = (String[]) new DirectFieldAccessor(messageBuilderFactory)
|
||||
.getPropertyValue("readOnlyHeaders");
|
||||
if (readOnlyHeaders == null
|
||||
|| !Arrays.asList(readOnlyHeaders).contains(MessageHeaders.CONTENT_TYPE)) {
|
||||
String[] newReadOnlyHeaders = readOnlyHeaders == null ? new String[1]
|
||||
: Arrays.copyOf(readOnlyHeaders, readOnlyHeaders.length + 1);
|
||||
int pos = readOnlyHeaders != null ? readOnlyHeaders.length : 0;
|
||||
newReadOnlyHeaders[pos] = MessageHeaders.CONTENT_TYPE;
|
||||
messageBuilderFactory.setReadOnlyHeaders(newReadOnlyHeaders);
|
||||
}
|
||||
}
|
||||
catch (RuntimeException e) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// IMPORTANT: Nested class to avoid instantiating all of the above early
|
||||
@Configuration
|
||||
protected static class PostProcessorConfiguration {
|
||||
|
||||
@@ -31,9 +31,14 @@ import org.springframework.context.annotation.Import;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.support.DefaultMessageBuilderFactory;
|
||||
import org.springframework.integration.support.MessageBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
@@ -41,6 +46,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
* @author Ilayaperumal Gopinathan
|
||||
* @author Gary Russell
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@SpringBootTest(classes = SourceBindingWithBindingTargetsTests.TestSource.class)
|
||||
@@ -57,6 +63,9 @@ public class SourceBindingWithBindingTargetsTests {
|
||||
@Qualifier(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
|
||||
private PublishSubscribeChannel errorChannel;
|
||||
|
||||
@Autowired
|
||||
private DefaultMessageBuilderFactory messageBuilderFactory;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testSourceOutputChannelBound() {
|
||||
@@ -66,6 +75,15 @@ public class SourceBindingWithBindingTargetsTests {
|
||||
verifyNoMoreInteractions(binder);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadOnlyContentType() {
|
||||
Message<?> message = MessageBuilder.withPayload("foo")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, "text/plain")
|
||||
.build();
|
||||
message = this.messageBuilderFactory.withPayload("bar").copyHeaders(message.getHeaders()).build();
|
||||
assertThat(message.getHeaders().get(MessageHeaders.CONTENT_TYPE)).isNull();
|
||||
}
|
||||
|
||||
@EnableBinding(Source.class)
|
||||
@EnableAutoConfiguration
|
||||
@Import(MockBinderRegistryConfiguration.class)
|
||||
|
||||
Reference in New Issue
Block a user