diff --git a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java index 5cfe44214..180ad4986 100644 --- a/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java +++ b/spring-cloud-stream-integration-tests/src/test/java/org/springframework/cloud/stream/config/ErrorChannelTests.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.annotation.EnableBinding; @@ -32,7 +33,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.PropertySource; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; -import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -49,7 +49,8 @@ import org.springframework.util.Assert; public class ErrorChannelTests { @Autowired - private PublishSubscribeChannel errorChannel; + @Qualifier(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL) + private MessageChannel errorBridgeChannel; @Autowired private BinderFactory binderFactory; @@ -57,7 +58,7 @@ public class ErrorChannelTests { @Test public void testErrorChannelBinding() throws Exception { Message message = ((TestSupportBinder) binderFactory.getBinder(null, MessageChannel.class)) - .messageCollector().forChannel(errorChannel).poll(10, TimeUnit.SECONDS); + .messageCollector().forChannel(errorBridgeChannel).poll(10, TimeUnit.SECONDS); Assert.isTrue(message instanceof ErrorMessage, "Message should be an instance of ErrorMessage"); Assert.isTrue(message.getPayload() instanceof MessagingException, "Message payload should be an instance" + "of MessagingException"); diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java index e0e014031..069a5526c 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/config/BindingServiceConfiguration.java @@ -54,12 +54,15 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.DependsOn; import org.springframework.expression.PropertyAccessor; +import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.config.IntegrationEvaluationContextFactoryBean; import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; +import org.springframework.integration.handler.BridgeHandler; import org.springframework.integration.json.JsonPropertyAccessor; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.SubscribableChannel; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.core.DestinationResolutionException; import org.springframework.messaging.core.DestinationResolver; @@ -83,7 +86,9 @@ public class BindingServiceConfiguration { public static final String STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME = "streamListenerAnnotationBeanPostProcessor"; - private static final String ERROR_CHANNEL_NAME = "error"; + public static final String ERROR_BRIDGE_CHANNEL = "errorBridgeChannel"; + + private static final String ERROR_KEY_NAME = "error"; @Autowired(required = false) private ObjectMapper objectMapper; @@ -164,10 +169,23 @@ public class BindingServiceConfiguration { } @Bean - @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_CHANNEL_NAME + ".destination") - public SingleBindingTargetBindable errorChannelBindable( + @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination") + public MessageChannel errorBridgeChannel( @Qualifier(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) PublishSubscribeChannel errorChannel) { - return new SingleBindingTargetBindable(ERROR_CHANNEL_NAME, errorChannel); + SubscribableChannel errorBridgeChannel = new DirectChannel(); + BridgeHandler handler = new BridgeHandler(); + handler.setOutputChannel(errorBridgeChannel); + errorChannel.subscribe(handler); + return errorBridgeChannel; + } + + @Bean + @ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination") + public SingleBindingTargetBindable errorBridgeChannelBindable( + @Qualifier(ERROR_BRIDGE_CHANNEL) MessageChannel errorBridgeChannel, + CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) { + compositeMessageChannelConfigurer.configureOutputChannel(errorBridgeChannel, ERROR_KEY_NAME); + return new SingleBindingTargetBindable<>(ERROR_KEY_NAME, errorBridgeChannel); } @Bean diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java index e9972deec..3f4346997 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binder/ErrorBindingTests.java @@ -16,19 +16,28 @@ package org.springframework.cloud.stream.binder; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.Test; import org.mockito.Mockito; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.config.BindingServiceConfiguration; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.cloud.stream.utils.MockBinderRegistryConfiguration; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Import; import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.GenericMessage; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; @@ -66,7 +75,7 @@ public class ErrorBindingTests { @SuppressWarnings("unchecked") Binder binder = binderFactory.getBinder(null, MessageChannel.class); - MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel errorChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, MessageChannel.class); Mockito.verify(binder).bindConsumer(eq("input"), isNull(String.class), any(MessageChannel.class), @@ -77,10 +86,83 @@ public class ErrorBindingTests { applicationContext.close(); } + @Test + public void testErrorChannelIsBoundWithCorrectContentTypeConverter() { + final AtomicBoolean received = new AtomicBoolean(false); + ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class, + "--spring.cloud.stream.bindings.error.destination=foo", + "--spring.cloud.stream.bindings.error.content-type=application/json", + "--server.port=0"); + + MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, + MessageChannel.class); + + ((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + assertThat(message.getPayload()).isEqualTo("{\"foo\":\"bar\"}"); + received.set(true); + } + }); + + Foo foo = new Foo(); + foo.setFoo("bar"); + + errorChannel.send(new GenericMessage<>(foo)); + assertThat(received.get()).isTrue(); + applicationContext.close(); + } + + @Test + public void testErrorChannelForExceptionWhenContentTypeIsSet() { + final AtomicBoolean received = new AtomicBoolean(false); + ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class, + "--spring.cloud.stream.bindings.error.destination=foo", + "--spring.cloud.stream.bindings.error.content-type=application/json", + "--server.port=0"); + + MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, + MessageChannel.class); + MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL, + MessageChannel.class); + + ((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + String payload = (String) message.getPayload(); + assertThat(payload.contains("cause")).isTrue(); + assertThat(payload.contains("stackTrace")).isTrue(); + assertThat(payload.contains("throwing exception")).isTrue(); + received.set(true); + } + }); + + Foo foo = new Foo(); + foo.setFoo("bar"); + + errorChannel.send(new GenericMessage<>(new Exception("throwing exception"))); + assertThat(received.get()).isTrue(); + applicationContext.close(); + } + @EnableBinding(Processor.class) @EnableAutoConfiguration @Import(MockBinderRegistryConfiguration.class) public static class TestProcessor { } + + private class Foo { + String foo; + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + } }