GH-1028: error binding Content-Type negotiation
Fixes spring-cloud/spring-cloud-stream/#1028 Adding unit test to verify message conversion for error channel Instead of directly binding on the error channel, use a bridge channel and apply custom converters on that channel and then bind Make corresponding test changes renaming key used for error Adding tests for exceptions on error channel when content type is set
This commit is contained in:
@@ -22,6 +22,7 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
@@ -32,7 +33,6 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.PropertySource;
|
||||
import org.springframework.integration.annotation.InboundChannelAdapter;
|
||||
import org.springframework.integration.annotation.Poller;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.core.MessageSource;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
@@ -49,7 +49,8 @@ import org.springframework.util.Assert;
|
||||
public class ErrorChannelTests {
|
||||
|
||||
@Autowired
|
||||
private PublishSubscribeChannel errorChannel;
|
||||
@Qualifier(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL)
|
||||
private MessageChannel errorBridgeChannel;
|
||||
|
||||
@Autowired
|
||||
private BinderFactory binderFactory;
|
||||
@@ -57,7 +58,7 @@ public class ErrorChannelTests {
|
||||
@Test
|
||||
public void testErrorChannelBinding() throws Exception {
|
||||
Message<?> message = ((TestSupportBinder) binderFactory.getBinder(null, MessageChannel.class))
|
||||
.messageCollector().forChannel(errorChannel).poll(10, TimeUnit.SECONDS);
|
||||
.messageCollector().forChannel(errorBridgeChannel).poll(10, TimeUnit.SECONDS);
|
||||
Assert.isTrue(message instanceof ErrorMessage, "Message should be an instance of ErrorMessage");
|
||||
Assert.isTrue(message.getPayload() instanceof MessagingException, "Message payload should be an instance" +
|
||||
"of MessagingException");
|
||||
|
||||
@@ -54,12 +54,15 @@ import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.DependsOn;
|
||||
import org.springframework.expression.PropertyAccessor;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
import org.springframework.integration.channel.PublishSubscribeChannel;
|
||||
import org.springframework.integration.config.IntegrationEvaluationContextFactoryBean;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
|
||||
import org.springframework.integration.handler.BridgeHandler;
|
||||
import org.springframework.integration.json.JsonPropertyAccessor;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.converter.MessageConverter;
|
||||
import org.springframework.messaging.core.DestinationResolutionException;
|
||||
import org.springframework.messaging.core.DestinationResolver;
|
||||
@@ -83,7 +86,9 @@ public class BindingServiceConfiguration {
|
||||
|
||||
public static final String STREAM_LISTENER_ANNOTATION_BEAN_POST_PROCESSOR_NAME = "streamListenerAnnotationBeanPostProcessor";
|
||||
|
||||
private static final String ERROR_CHANNEL_NAME = "error";
|
||||
public static final String ERROR_BRIDGE_CHANNEL = "errorBridgeChannel";
|
||||
|
||||
private static final String ERROR_KEY_NAME = "error";
|
||||
|
||||
@Autowired(required = false)
|
||||
private ObjectMapper objectMapper;
|
||||
@@ -164,10 +169,23 @@ public class BindingServiceConfiguration {
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_CHANNEL_NAME + ".destination")
|
||||
public SingleBindingTargetBindable<MessageChannel> errorChannelBindable(
|
||||
@ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination")
|
||||
public MessageChannel errorBridgeChannel(
|
||||
@Qualifier(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) PublishSubscribeChannel errorChannel) {
|
||||
return new SingleBindingTargetBindable<MessageChannel>(ERROR_CHANNEL_NAME, errorChannel);
|
||||
SubscribableChannel errorBridgeChannel = new DirectChannel();
|
||||
BridgeHandler handler = new BridgeHandler();
|
||||
handler.setOutputChannel(errorBridgeChannel);
|
||||
errorChannel.subscribe(handler);
|
||||
return errorBridgeChannel;
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("spring.cloud.stream.bindings." + ERROR_KEY_NAME + ".destination")
|
||||
public SingleBindingTargetBindable<MessageChannel> errorBridgeChannelBindable(
|
||||
@Qualifier(ERROR_BRIDGE_CHANNEL) MessageChannel errorBridgeChannel,
|
||||
CompositeMessageChannelConfigurer compositeMessageChannelConfigurer) {
|
||||
compositeMessageChannelConfigurer.configureOutputChannel(errorBridgeChannel, ERROR_KEY_NAME);
|
||||
return new SingleBindingTargetBindable<>(ERROR_KEY_NAME, errorBridgeChannel);
|
||||
}
|
||||
|
||||
@Bean
|
||||
|
||||
@@ -16,19 +16,28 @@
|
||||
|
||||
package org.springframework.cloud.stream.binder;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.config.BindingServiceConfiguration;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
import org.springframework.cloud.stream.utils.MockBinderRegistryConfiguration;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.integration.context.IntegrationContextUtils;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
import org.springframework.messaging.MessageHandler;
|
||||
import org.springframework.messaging.MessagingException;
|
||||
import org.springframework.messaging.SubscribableChannel;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isNull;
|
||||
@@ -66,7 +75,7 @@ public class ErrorBindingTests {
|
||||
@SuppressWarnings("unchecked")
|
||||
Binder binder = binderFactory.getBinder(null, MessageChannel.class);
|
||||
|
||||
MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
|
||||
MessageChannel errorChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL,
|
||||
MessageChannel.class);
|
||||
|
||||
Mockito.verify(binder).bindConsumer(eq("input"), isNull(String.class), any(MessageChannel.class),
|
||||
@@ -77,10 +86,83 @@ public class ErrorBindingTests {
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorChannelIsBoundWithCorrectContentTypeConverter() {
|
||||
final AtomicBoolean received = new AtomicBoolean(false);
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class,
|
||||
"--spring.cloud.stream.bindings.error.destination=foo",
|
||||
"--spring.cloud.stream.bindings.error.content-type=application/json",
|
||||
"--server.port=0");
|
||||
|
||||
MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
|
||||
MessageChannel.class);
|
||||
MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL,
|
||||
MessageChannel.class);
|
||||
|
||||
((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() {
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
assertThat(message.getPayload()).isEqualTo("{\"foo\":\"bar\"}");
|
||||
received.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
Foo foo = new Foo();
|
||||
foo.setFoo("bar");
|
||||
|
||||
errorChannel.send(new GenericMessage<>(foo));
|
||||
assertThat(received.get()).isTrue();
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorChannelForExceptionWhenContentTypeIsSet() {
|
||||
final AtomicBoolean received = new AtomicBoolean(false);
|
||||
ConfigurableApplicationContext applicationContext = SpringApplication.run(TestProcessor.class,
|
||||
"--spring.cloud.stream.bindings.error.destination=foo",
|
||||
"--spring.cloud.stream.bindings.error.content-type=application/json",
|
||||
"--server.port=0");
|
||||
|
||||
MessageChannel errorChannel = applicationContext.getBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
|
||||
MessageChannel.class);
|
||||
MessageChannel errorBridgeChannel = applicationContext.getBean(BindingServiceConfiguration.ERROR_BRIDGE_CHANNEL,
|
||||
MessageChannel.class);
|
||||
|
||||
((SubscribableChannel)errorBridgeChannel).subscribe(new MessageHandler() {
|
||||
@Override
|
||||
public void handleMessage(Message<?> message) throws MessagingException {
|
||||
String payload = (String) message.getPayload();
|
||||
assertThat(payload.contains("cause")).isTrue();
|
||||
assertThat(payload.contains("stackTrace")).isTrue();
|
||||
assertThat(payload.contains("throwing exception")).isTrue();
|
||||
received.set(true);
|
||||
}
|
||||
});
|
||||
|
||||
Foo foo = new Foo();
|
||||
foo.setFoo("bar");
|
||||
|
||||
errorChannel.send(new GenericMessage<>(new Exception("throwing exception")));
|
||||
assertThat(received.get()).isTrue();
|
||||
applicationContext.close();
|
||||
}
|
||||
|
||||
@EnableBinding(Processor.class)
|
||||
@EnableAutoConfiguration
|
||||
@Import(MockBinderRegistryConfiguration.class)
|
||||
public static class TestProcessor {
|
||||
|
||||
}
|
||||
|
||||
private class Foo {
|
||||
String foo;
|
||||
|
||||
public String getFoo() {
|
||||
return foo;
|
||||
}
|
||||
|
||||
public void setFoo(String foo) {
|
||||
this.foo = foo;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user