Replaced ConcurrentTarget and the <concurrency/> element with ConcurrencyInterceptor.

This commit is contained in:
Mark Fisher
2008-07-01 19:38:55 +00:00
parent 117973082e
commit f3f7daa201
24 changed files with 342 additions and 639 deletions

View File

@@ -80,7 +80,7 @@ public class DirectChannelSubscriptionTests {
bus.stop();
}
@Test(expected=MessagingException.class)
@Test(expected=RuntimeException.class)
public void testExceptionThrownFromRegisteredEndpoint() {
QueueChannel errorChannel = new QueueChannel();
bus.setErrorChannel(errorChannel);

View File

@@ -31,14 +31,11 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.RendezvousChannel;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.SourceEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.PollingSchedule;
@@ -184,41 +181,6 @@ public class MessageBusTests {
bus.stop();
}
@Test
public void testDefaultConcurrencyPolicy() throws InterruptedException {
MessageBus bus = new MessageBus();
bus.setDefaultConcurrencyPolicy(new ConcurrencyPolicy(1, 3));
final CountDownLatch latch = new CountDownLatch(3);
MessageHandler testHandler = new MessageHandler() {
public Message<?> handle(Message<?> message) {
latch.countDown();
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
};
DispatcherPolicy dispatcherPolicy = new DispatcherPolicy();
dispatcherPolicy.setRejectionLimit(1);
dispatcherPolicy.setRetryInterval(0);
RendezvousChannel testChannel = new RendezvousChannel(dispatcherPolicy);
bus.registerChannel("testChannel", testChannel);
bus.registerHandler("testHandler", testHandler, new Subscription(testChannel));
bus.start();
for (int i = 0; i < 4; i++) {
assertTrue(testChannel.send(new StringMessage("test-"+ i), 1000));
}
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
MessageChannel errorChannel = bus.getErrorChannel();
Message<?> errorMessage = errorChannel.receive(500);
assertNotNull(errorMessage);
assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass());
}
@Test(expected = BeanCreationException.class)
public void testMultipleMessageBusBeans() {
new ClassPathXmlApplicationContext("multipleMessageBusBeans.xml", this.getClass());

View File

@@ -27,19 +27,23 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.bus.SubscriptionManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.DispatcherPolicy;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.MessageEndpointBeanPostProcessor;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.MessageHandlerRejectedExecutionException;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.PayloadTypeSelector;
import org.springframework.integration.scheduling.MessagePublishingErrorHandler;
import org.springframework.integration.scheduling.SimpleMessagingTaskScheduler;
@@ -99,12 +103,12 @@ public class SubscriptionManagerTests {
MessageHandler handler3 = TestHandlers.countingCountDownHandler(counter3, latch);
QueueChannel channel = new QueueChannel();
SubscriptionManager manager = new SubscriptionManager(channel, scheduler);
HandlerEndpoint inactiveEndpoint = createEndpoint(handler1, true);
MessageTarget inactiveEndpoint = createEndpoint(handler1, true);
manager.addTarget(inactiveEndpoint);
manager.addTarget(createEndpoint(handler2, true));
manager.addTarget(createEndpoint(handler3, true));
manager.start();
inactiveEndpoint.stop();
((Lifecycle) inactiveEndpoint).stop();
channel.send(new StringMessage(1, "test"));
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals("messages should have been dispatched within allotted time", 0, latch.getCount());
@@ -123,12 +127,12 @@ public class SubscriptionManagerTests {
MessageHandler handler3 = TestHandlers.countingCountDownHandler(counter3, latch);
QueueChannel channel = new QueueChannel(5, new DispatcherPolicy(true));
SubscriptionManager manager = new SubscriptionManager(channel, scheduler);
HandlerEndpoint inactiveEndpoint = createEndpoint(handler2, true);
MessageTarget inactiveEndpoint = createEndpoint(handler2, true);
manager.addTarget(createEndpoint(handler1, true));
manager.addTarget(inactiveEndpoint);
manager.addTarget(createEndpoint(handler3, true));
manager.start();
inactiveEndpoint.stop();
((Lifecycle) inactiveEndpoint).stop();
channel.send(new StringMessage(1, "test"));
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals("messages should have been dispatched within allotted time", 0, latch.getCount());
@@ -434,9 +438,7 @@ public class SubscriptionManagerTests {
channel.send(new StringMessage(1, "test"));
SubscriptionManager manager = new SubscriptionManager(channel, scheduler);
HandlerEndpoint endpoint1 = new HandlerEndpoint(handler1);
endpoint1.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
HandlerEndpoint endpoint2 = new HandlerEndpoint(handler2);
endpoint2.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
endpoint1.setMessageSelector(new PayloadTypeSelector(Integer.class));
endpoint2.setMessageSelector(new PayloadTypeSelector(String.class));
manager.addTarget(endpoint1);
@@ -449,12 +451,19 @@ public class SubscriptionManagerTests {
}
private static HandlerEndpoint createEndpoint(MessageHandler handler, boolean asynchronous) {
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
private static MessageTarget createEndpoint(MessageHandler handler, boolean asynchronous) {
MessageTarget endpoint = new HandlerEndpoint(handler);
if (asynchronous) {
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
MessageEndpointBeanPostProcessor postProcessor = new MessageEndpointBeanPostProcessor();
((AbstractEndpoint) endpoint).addInterceptor(new ConcurrencyInterceptor(new ConcurrencyPolicy(1, 1)));
endpoint = (MessageTarget) postProcessor.postProcessAfterInitialization(endpoint, "test-endpoint");
}
try {
((InitializingBean) endpoint).afterPropertiesSet();
}
catch (Exception e) {
throw new RuntimeException("failed to initialize endpoint", e);
}
endpoint.afterPropertiesSet();
return endpoint;
}

View File

@@ -30,11 +30,8 @@ import org.springframework.context.Lifecycle;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.MessageTarget;
@@ -81,30 +78,6 @@ public class EndpointParserTests {
assertEquals("test-1-2-3", reply.getPayload());
}
@Test
public void testDefaultConcurrency() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointConcurrencyTests.xml", this.getClass());
HandlerEndpoint endpoint = (HandlerEndpoint) context.getBean("defaultConcurrencyEndpoint");
ConcurrencyPolicy concurrencyPolicy = endpoint.getConcurrencyPolicy();
assertEquals(ConcurrencyPolicy.DEFAULT_CORE_SIZE, concurrencyPolicy.getCoreSize());
assertEquals(ConcurrencyPolicy.DEFAULT_MAX_SIZE, concurrencyPolicy.getMaxSize());
assertEquals(ConcurrencyPolicy.DEFAULT_QUEUE_CAPACITY, concurrencyPolicy.getQueueCapacity());
assertEquals(ConcurrencyPolicy.DEFAULT_KEEP_ALIVE_SECONDS, concurrencyPolicy.getKeepAliveSeconds());
}
@Test
public void testConfiguredConcurrency() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointConcurrencyTests.xml", this.getClass());
HandlerEndpoint endpoint = (HandlerEndpoint) context.getBean("configuredConcurrencyEndpoint");
ConcurrencyPolicy concurrencyPolicy = endpoint.getConcurrencyPolicy();
assertEquals(7, concurrencyPolicy.getCoreSize());
assertEquals(77, concurrencyPolicy.getMaxSize());
assertEquals(777, concurrencyPolicy.getQueueCapacity());
assertEquals(7777, concurrencyPolicy.getKeepAliveSeconds());
}
@Test
public void testEndpointWithSelectorAccepts() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
@@ -132,21 +105,6 @@ public class EndpointParserTests {
assertFalse(endpoint.send(message));
}
@Test
public void testCustomErrorHandler() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithErrorHandler.xml", this.getClass());
MessageTarget endpoint = (MessageTarget) context.getBean("endpoint");
TestErrorHandler errorHandler = (TestErrorHandler) context.getBean("errorHandler");
assertNull(errorHandler.getLastError());
Message<?> message = new StringMessage("test");
endpoint.send(message);
Throwable error = errorHandler.getLastError();
assertEquals(MessageHandlingException.class, error.getClass());
MessageHandlingException exception = (MessageHandlingException) error;
assertEquals(message, exception.getFailedMessage());
}
@Test
public void testCustomReplyHandler() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(

View File

@@ -38,7 +38,6 @@ import org.springframework.integration.bus.interceptor.TestMessageBusStartInterc
import org.springframework.integration.bus.interceptor.TestMessageBusStopInterceptor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dispatcher.DirectChannel;
import org.springframework.integration.endpoint.TargetEndpoint;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.scheduling.SimpleMessagingTaskScheduler;
import org.springframework.integration.scheduling.Subscription;
@@ -128,24 +127,6 @@ public class MessageBusParserTests {
bus.stop();
}
@Test
public void testDefaultConcurrency() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithDefaultConcurrencyTests.xml", this.getClass());
TargetEndpoint endpoint1 = (TargetEndpoint) context.getBean("endpoint1");
assertEquals(4, endpoint1.getConcurrencyPolicy().getCoreSize());
assertEquals(7, endpoint1.getConcurrencyPolicy().getMaxSize());
}
@Test
public void testExplicitConcurrencyTakesPrecedence() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithDefaultConcurrencyTests.xml", this.getClass());
TargetEndpoint endpoint2 = (TargetEndpoint) context.getBean("endpoint2");
assertEquals(14, endpoint2.getConcurrencyPolicy().getCoreSize());
assertEquals(17, endpoint2.getConcurrencyPolicy().getMaxSize());
}
@Test
public void testMessageBusAwareAutomaticallyAddedByNamespace() {
ApplicationContext context = new ClassPathXmlApplicationContext(

View File

@@ -26,11 +26,14 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.aopalliance.aop.Advice;
import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -49,13 +52,14 @@ import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
/**
* @author Mark Fisher
@@ -178,11 +182,18 @@ public class MessagingAnnotationPostProcessorTests {
ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint");
ConcurrencyPolicy concurrencyPolicy = endpoint.getConcurrencyPolicy();
assertEquals(17, concurrencyPolicy.getCoreSize());
assertEquals(42, concurrencyPolicy.getMaxSize());
assertEquals(11, concurrencyPolicy.getQueueCapacity());
assertEquals(123, concurrencyPolicy.getKeepAliveSeconds());
assertEquals(1, endpoint.getInterceptors().size());
Advice interceptor = endpoint.getInterceptors().get(0);
DirectFieldAccessor accessor = new DirectFieldAccessor(interceptor);
ConcurrencyInterceptor concurrencyInterceptor = (ConcurrencyInterceptor)
accessor.getPropertyValue("interceptor");
accessor = new DirectFieldAccessor(concurrencyInterceptor);
ConcurrentTaskExecutor cte = (ConcurrentTaskExecutor) accessor.getPropertyValue("executor");
ThreadPoolExecutor executor = (ThreadPoolExecutor) cte.getConcurrentExecutor();
assertEquals(17, executor.getCorePoolSize());
assertEquals(42, executor.getMaximumPoolSize());
assertEquals(123, executor.getKeepAliveTime(TimeUnit.SECONDS));
assertEquals(11, executor.getQueue().remainingCapacity());
}
@Test(expected=IllegalArgumentException.class)

View File

@@ -36,11 +36,9 @@ import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.MessageHandlerNotRunningException;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
import org.springframework.integration.message.selector.MessageSelectorChain;
import org.springframework.integration.util.ErrorHandler;
/**
* @author Mark Fisher
@@ -141,48 +139,6 @@ public class HandlerEndpointTests {
endpoint.stop();
}
@Test
public void testCustomErrorHandler() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(2);
HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.rejectingCountDownHandler(latch));
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
endpoint.setErrorHandler(new ErrorHandler() {
public void handle(Throwable t) {
latch.countDown();
}
});
endpoint.start();
endpoint.send(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals("both handler and errorHandler should have been invoked", 0, latch.getCount());
}
@Test
public void testConcurrentHandlerWithDefaultReplyChannel() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
public Message<String> handle(Message<?> message) {
latch.countDown();
return new StringMessage("123", "hello " + message.getPayload());
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
endpoint.setChannelRegistry(channelRegistry);
endpoint.setOutputChannelName("replyChannel");
endpoint.start();
endpoint.send(new StringMessage(1, "test"));
latch.await(500, TimeUnit.MILLISECONDS);
endpoint.stop();
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(100);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
}
@Test
public void testHandlerReturnsNull() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
@@ -207,111 +163,6 @@ public class HandlerEndpointTests {
assertNull(reply);
}
@Test
public void testConcurrentHandlerReturnsNull() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
public Message<String> handle(Message<?> message) {
latch.countDown();
return null;
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
endpoint.setChannelRegistry(channelRegistry);
endpoint.setOutputChannelName("replyChannel");
endpoint.start();
endpoint.send(new StringMessage(1, "test"));
latch.await(500, TimeUnit.MILLISECONDS);
endpoint.stop();
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(0);
assertNull(reply);
}
@Test
public void testConcurrentHandlerWithExplicitReplyChannel() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
public Message<String> handle(Message<?> message) {
latch.countDown();
return new StringMessage("123", "hello " + message.getPayload());
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(1, 1));
endpoint.setChannelRegistry(channelRegistry);
endpoint.start();
StringMessage message = new StringMessage(1, "test");
message.getHeader().setReturnAddress("replyChannel");
endpoint.send(message);
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(100);
endpoint.stop();
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
}
@Test
public void testGeneratedConcurrentHandlerWithDefaultReplyChannel() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
public Message<String> handle(Message<?> message) {
latch.countDown();
return new StringMessage("123", "hello " + message.getPayload());
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(3, 14));
endpoint.setOutputChannelName("replyChannel");
endpoint.start();
endpoint.send(new StringMessage(1, "test"));
latch.await(500, TimeUnit.MILLISECONDS);
endpoint.stop();
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(100);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
}
@Test
public void testGeneratedConcurrentHandlerWithExplicitReplyChannel() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);
MessageHandler handler = new MessageHandler() {
public Message<String> handle(Message<?> message) {
latch.countDown();
return new StringMessage("123", "hello " + message.getPayload());
}
};
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
endpoint.setChannelRegistry(channelRegistry);
endpoint.setConcurrencyPolicy(new ConcurrencyPolicy(3, 14));
endpoint.start();
StringMessage message = new StringMessage(1, "test");
message.getHeader().setReturnAddress("replyChannel");
endpoint.send(message);
latch.await(500, TimeUnit.MILLISECONDS);
endpoint.stop();
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
Message<?> reply = replyChannel.receive(100);
assertNotNull(reply);
assertEquals("hello test", reply.getPayload());
}
@Test(expected=MessageHandlerNotRunningException.class)
public void testEndpointDoesNotHandleMessagesWhenNotYetStarted() {
HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler());
@@ -438,84 +289,6 @@ public class HandlerEndpointTests {
endpoint.stop();
}
@Test
public void testDefaultOutputChannelTimeoutSendsToErrorHandler() {
QueueChannel output = new QueueChannel(1);
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("output", output);
HandlerEndpoint endpoint = new HandlerEndpoint(new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
});
endpoint.setOutputChannelName("output");
endpoint.setChannelRegistry(channelRegistry);
TestErrorHandler errorHandler = new TestErrorHandler();
endpoint.setErrorHandler(errorHandler);
endpoint.setReplyTimeout(0);
endpoint.start();
endpoint.send(new StringMessage("test1"));
assertNull(errorHandler.getLastError());
endpoint.send(new StringMessage("test2"));
Throwable error = errorHandler.getLastError();
assertNotNull(error);
assertEquals(MessageDeliveryException.class, error.getClass());
assertEquals("test2", ((MessageDeliveryException) error).getFailedMessage().getPayload());
}
@Test
public void testReturnAddressChannelTimeoutSendsToErrorHandler() {
QueueChannel replyChannel = new QueueChannel(1);
HandlerEndpoint endpoint = new HandlerEndpoint(new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
});
TestErrorHandler errorHandler = new TestErrorHandler();
endpoint.setErrorHandler(errorHandler);
endpoint.setReplyTimeout(0);
endpoint.start();
Message<?> message1 = new StringMessage("test1");
message1.getHeader().setReturnAddress(replyChannel);
endpoint.send(message1);
assertNull(errorHandler.getLastError());
Message<?> message2 = new StringMessage("test2");
message2.getHeader().setReturnAddress(replyChannel);
endpoint.send(message2);
Throwable error = errorHandler.getLastError();
assertNotNull(error);
assertEquals(MessageDeliveryException.class, error.getClass());
assertEquals(message2, ((MessageDeliveryException) error).getFailedMessage());
}
@Test
public void testReturnAddressChannelNameTimeoutSendsToErrorHandler() {
QueueChannel replyChannel = new QueueChannel(1);
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
HandlerEndpoint endpoint = new HandlerEndpoint(new MessageHandler() {
public Message<?> handle(Message<?> message) {
return message;
}
});
endpoint.setChannelRegistry(channelRegistry);
TestErrorHandler errorHandler = new TestErrorHandler();
endpoint.setErrorHandler(errorHandler);
endpoint.setReplyTimeout(10);
endpoint.start();
Message<?> message1 = new StringMessage("test1");
message1.getHeader().setReturnAddress("replyChannel");
endpoint.send(message1);
assertNull(errorHandler.getLastError());
Message<?> message2 = new StringMessage("test2");
message2.getHeader().setReturnAddress("replyChannel");
endpoint.send(message2);
Throwable error = errorHandler.getLastError();
assertNotNull(error);
assertEquals(MessageDeliveryException.class, error.getClass());
assertEquals(message2, ((MessageDeliveryException) error).getFailedMessage());
}
@Test
public void testCorrelationId() {
QueueChannel replyChannel = new QueueChannel(1);
@@ -551,19 +324,4 @@ public class HandlerEndpointTests {
assertEquals("ABC-123", correlationId);
}
private static class TestErrorHandler implements ErrorHandler {
private volatile Throwable lastError;
public void handle(Throwable t) {
this.lastError = t;
}
Throwable getLastError() {
return this.lastError;
}
}
}

View File

@@ -80,7 +80,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel3 = (MessageChannel) context.getBean("channel3");
MessageChannel errorChannel = (MessageChannel) context.getBean("errorChannel");
MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel");
context.start();
StringMessage message = new StringMessage("*");
channel3.send(message);
@@ -93,7 +93,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel3 = (MessageChannel) context.getBean("channel3WithOverride");
MessageChannel errorChannel = (MessageChannel) context.getBean("errorChannel");
MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel");
context.start();
StringMessage message = new StringMessage("*");
channel3.send(message);

View File

@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<si:message-bus error-channel="errorChannel"/>
<si:message-bus error-channel="customErrorChannel"/>
<si:channel id="channel1"/>
<si:channel id="channel2"/>
@@ -16,7 +16,7 @@
<si:channel id="channel1WithOverride"/>
<si:channel id="channel3WithOverride"/>
<si:channel id="replyChannel"/>
<si:channel id="errorChannel"/>
<si:channel id="customErrorChannel"/>
<si:handler-endpoint input-channel="channel1WithOverride" handler="testBean" method="duplicate"
output-channel="channel2" return-address-overrides="true"/>