From 22141a8dce93f32d10825ba9fb0cb4d5e29fcb95 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Mon, 24 Jul 2017 20:05:53 -0400 Subject: [PATCH] GH-1028: error binding Content-Type negotiation Fixes spring-cloud/spring-cloud-stream/#1028 Adding unit test to verify message conversion for error channel Instead of directly binding on the error channel, use a bridge channel and apply custom converters on that channel and then bind Make corresponding test changes renaming key used for error Adding tests for exceptions on error channel when content type is set --- .../stream/config/ErrorChannelTests.java | 7 +- .../config/BindingServiceConfiguration.java | 26 +++++- .../stream/binder/ErrorBindingTests.java | 84 ++++++++++++++++++- 3 files changed, 109 insertions(+), 8 deletions(-) diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java index 5cfe44214..180ad4986 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.annotation.EnableBinding; @@ -32,7 +33,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; -import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -49,7 +49,8 @@ import org.springframework.util.Assert; public class ErrorChannelTests { @Autowired - private PublishSubscribeChannel errorChannel; + @Qualifier(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL) + private MessageChannel errorBridgeChannel; @Autowired private BinderFactory binderFactory; @@ -57,7 +58,7 @@ public class ErrorChannelTests { @Test public void testErrorChannelBinding() throws Exception { Message message = ((TestSupportBinder) binderFactory.getBinder(null, MessageChannel.class)) - .messageCollector().forChannel(errorChannel).poll(10, TimeUnit.SECONDS); + .messageCollector().forChannel(errorBridgeChannel).poll(10, TimeUnit.SECONDS); Assert.isTrue(message instanceof ErrorMessage, "Message should be an instance of ErrorMessage"); Assert.isTrue(message.getPayload() instanceof MessagingException, "Message payload should be an instance" + "of MessagingException"); diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java index e0e014031..069a5526c 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java @@ -54,12 +54,15 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.expression.PropertyAccessor; +import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.config.IntegrationEvaluationContextFactoryBean; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.BridgeHandler; import org.springframework.integration.json.JsonPropertyAccessor; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.core.DestinationResolutionException; import org.springframework.messaging.core.DestinationResolver; @@ -83,7 +86,9 @@ public class BindingServiceConfiguration { public static final String STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME = "streamListenerAnnotationBeanPostProcessor"; - private static final String ERROR_CHANNEL_NAME = "error"; + public static final String ERROR_BRIDGE_CHANNEL = "errorBridgeChannel"; + + private static final String ERROR_KEY_NAME = "error"; @Autowired(required = false) private ObjectMapper objectMapper; @@ -164,10 +169,23 @@ public class BindingServiceConfiguration { } @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_CHANNEL_NAME + ".destination") - public SingleBindingTargetBindable errorChannelBindable( + @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination") + public MessageChannel errorBridgeChannel( @Qualifier(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) PublishSubscribeChannel errorChannel) { - return new SingleBindingTargetBindable(ERROR_CHANNEL_NAME, errorChannel); + SubscribableChannel errorBridgeChannel = new DirectChannel(); + BridgeHandler handler = new BridgeHandler(); + handler.setOutputChannel(errorBridgeChannel); + errorChannel.subscribe(handler); + return errorBridgeChannel; + } + + @Bean + @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination") + public SingleBindingTargetBindable errorBridgeChannelBindable( + @Qualifier(ERROR_BRIDGE_CHANNEL) MessageChannel errorBridgeChannel, + CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) { + compositeMessageChannelConfigurer.configureOutputChannel(errorBridgeChannel, ERROR_KEY_NAME); + return new SingleBindingTargetBindable<>(ERROR_KEY_NAME, errorBridgeChannel); } @Bean diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java index e9972deec..3f4346997 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java @@ -16,19 +16,28 @@ package org.springframework.cloud.stream.binder; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Test; import org.mockito.Mockito; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.config.BindingServiceConfiguration; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.utils.MockBinderRegistryConfiguration; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Import; import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.GenericMessage; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; @@ -66,7 +75,7 @@ public class ErrorBindingTests { @SuppressWarnings("unchecked") Binder binder = binderFactory.getBinder(null, MessageChannel.class); - MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel errorChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, MessageChannel.class); Mockito.verify(binder).bindConsumer(eq("input"), isNull(String.class), any(MessageChannel.class), @@ -77,10 +86,83 @@ public class ErrorBindingTests { applicationContext.close(); } + @Test + public void testErrorChannelIsBoundWithCorrectContentTypeConverter() { + final AtomicBoolean received = new AtomicBoolean(false); + ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class, + "--spring.cloud.stream.bindings.error.destination=foo", + "--spring.cloud.stream.bindings.error.content-type=application/json", + "--server.port=0"); + + MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, + MessageChannel.class); + + ((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + assertThat(message.getPayload()).isEqualTo("{\"foo\":\"bar\"}"); + received.set(true); + } + }); + + Foo foo = new Foo(); + foo.setFoo("bar"); + + errorChannel.send(new GenericMessage<>(foo)); + assertThat(received.get()).isTrue(); + applicationContext.close(); + } + + @Test + public void testErrorChannelForExceptionWhenContentTypeIsSet() { + final AtomicBoolean received = new AtomicBoolean(false); + ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class, + "--spring.cloud.stream.bindings.error.destination=foo", + "--spring.cloud.stream.bindings.error.content-type=application/json", + "--server.port=0"); + + MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, + MessageChannel.class); + + ((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + String payload = (String) message.getPayload(); + assertThat(payload.contains("cause")).isTrue(); + assertThat(payload.contains("stackTrace")).isTrue(); + assertThat(payload.contains("throwing exception")).isTrue(); + received.set(true); + } + }); + + Foo foo = new Foo(); + foo.setFoo("bar"); + + errorChannel.send(new GenericMessage<>(new Exception("throwing exception"))); + assertThat(received.get()).isTrue(); + applicationContext.close(); + } + @EnableBinding(Processor.class) @EnableAutoConfiguration @Import(MockBinderRegistryConfiguration.class) public static class TestProcessor { } + + private class Foo { + String foo; + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + } }