diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MapBasedChannelResolver.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MapBasedChannelResolver.java index 20e912f1d4..d65484e7ae 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MapBasedChannelResolver.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MapBasedChannelResolver.java @@ -30,7 +30,7 @@ import org.springframework.util.Assert; */ public class MapBasedChannelResolver implements ChannelResolver { - private volatile Map channelMap = new HashMap(); + private volatile Map channelMap = new HashMap(); /** * Empty constructor for use when providing the channel map via @@ -43,7 +43,7 @@ public class MapBasedChannelResolver implements ChannelResolver { * Create a {@link ChannelResolver} that uses the provided Map. * Each String key will resolve to the associated channel value. */ - public MapBasedChannelResolver(Map channelMap) { + public MapBasedChannelResolver(Map channelMap) { this.setChannelMap(channelMap); } @@ -51,7 +51,7 @@ public class MapBasedChannelResolver implements ChannelResolver { * Provide a map of channels to be used by this resolver. * Each String key will resolve to the associated channel value. */ - public void setChannelMap(Map channelMap) { + public void setChannelMap(Map channelMap) { Assert.notNull(channelMap, "channelMap must not be null"); this.channelMap = channelMap; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java index 90dfe1d627..58ac2147a2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java @@ -33,8 +33,12 @@ import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolutionException; import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.MessageChannelTemplate; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.core.MessageHeaders; +import org.springframework.integration.message.ErrorMessage; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.util.Assert; @@ -164,6 +168,17 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, } public final void handleMessage(final Message message) { + long delay = this.determineDelayForMessage(message); + if (delay > 0) { + this.releaseMessageAfterDelay(message, delay); + } + else { + // no delay, release directly + this.releaseMessage(message); + } + } + + private long determineDelayForMessage(Message message) { long delay = this.defaultDelay; if (this.delayHeaderName != null) { Object headerValue = message.getHeaders().get(this.delayHeaderName); @@ -182,17 +197,31 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, } } } - if (delay > 0) { - this.scheduler.schedule(new Runnable() { - public void run() { + return delay; + } + + private void releaseMessageAfterDelay(final Message message, long delay) { + this.scheduler.schedule(new Runnable() { + public void run() { + try { releaseMessage(message); } - }, delay, TimeUnit.MILLISECONDS); - } - else { - // no delay, release directly - this.releaseMessage(message); - } + catch (Exception e) { + Exception exception = new MessageDeliveryException(message, "Failed to deliver Message after delay.", e); + MessageChannel errorChannel = resolveErrorChannelIfPossible(message); + if (errorChannel != null) { + ErrorMessage errorMessage = new ErrorMessage(exception); + boolean sent = channelTemplate.send(errorMessage, errorChannel); + if (!sent && logger.isWarnEnabled()) { + logger.warn("Failed to send MessageDeliveryException to error channel.", exception); + } + } + else if (logger.isWarnEnabled()) { + logger.warn("No error channel available. MessageDeliveryException will be ignored.", exception); + } + } + } + }, delay, TimeUnit.MILLISECONDS); } private void releaseMessage(Message message) { @@ -203,21 +232,7 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, private MessageChannel resolveReplyChannel(Message message) { MessageChannel replyChannel = this.outputChannel; if (replyChannel == null) { - Object replyChannelHeader= message.getHeaders().getReplyChannel(); - if (replyChannelHeader != null) { - if (replyChannelHeader instanceof MessageChannel) { - replyChannel = (MessageChannel) replyChannelHeader; - } - else if (replyChannelHeader instanceof String) { - Assert.state(this.channelResolver != null, - "ChannelResolver is required for resolving a reply channel by name"); - replyChannel = this.channelResolver.resolveChannelName((String) replyChannelHeader); - } - else { - throw new ChannelResolutionException("expected a MessageChannel or String for 'replyChannel', but type is [" - + replyChannelHeader.getClass() + "]"); - } - } + replyChannel = this.resolveChannelFromHeader(message, MessageHeaders.REPLY_CHANNEL); } if (replyChannel == null) { throw new ChannelResolutionException( @@ -226,6 +241,42 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, return replyChannel; } + private MessageChannel resolveErrorChannelIfPossible(Message message) { + MessageChannel errorChannel = null; + try { + errorChannel = this.resolveChannelFromHeader(message, MessageHeaders.ERROR_CHANNEL); + } + catch (Exception e) { + if (logger.isWarnEnabled()) { + logger.warn("Failed to resolve error channel from header.", e); + } + } + if (errorChannel == null && this.channelResolver != null) { + errorChannel = this.channelResolver.resolveChannelName(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME); + } + return errorChannel; + } + + private MessageChannel resolveChannelFromHeader(Message message, String headerName) { + MessageChannel channel = null; + Object channelHeader = message.getHeaders().get(headerName); + if (channelHeader != null) { + if (channelHeader instanceof MessageChannel) { + channel = (MessageChannel) channelHeader; + } + else if (channelHeader instanceof String) { + Assert.state(this.channelResolver != null, + "ChannelResolver is required for resolving '" + headerName + "' by name."); + channel = this.channelResolver.resolveChannelName((String) channelHeader); + } + else { + throw new ChannelResolutionException("expected a MessageChannel or String for '" + + headerName + "', but type is [" + channelHeader.getClass() + "]"); + } + } + return channel; + } + public void destroy() { if (this.waitForTasksToCompleteOnShutdown) { this.scheduler.shutdown(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/DelayHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/DelayHandlerTests.java index 75af651edc..04dd2ff0d0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/DelayHandlerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/DelayHandlerTests.java @@ -28,9 +28,12 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.context.support.StaticApplicationContext; import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Message; import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.message.StringMessage; @@ -251,6 +254,111 @@ public class DelayHandlerTests { assertEquals(1, latch.getCount()); } + @Test(expected = UnsupportedOperationException.class) + public void handlerThrowsExceptionWithNoDelay() { + DelayHandler delayHandler = new DelayHandler(0); + delayHandler.setOutputChannel(output); + input.subscribe(delayHandler); + output.subscribe(new MessageHandler() { + public void handleMessage(Message message) { + throw new UnsupportedOperationException("intentional test failure"); + } + }); + Message message = MessageBuilder.withPayload("test").build(); + input.send(message); + } + + @Test + public void errorChannelHeaderAndHandlerThrowsExceptionWithDelay() { + DelayHandler delayHandler = new DelayHandler(0); + delayHandler.setDelayHeaderName("delay"); + delayHandler.setOutputChannel(output); + DirectChannel errorChannel = new DirectChannel(); + ResultHandler resultHandler = new ResultHandler(); + errorChannel.subscribe(resultHandler); + input.subscribe(delayHandler); + output.subscribe(new MessageHandler() { + public void handleMessage(Message message) { + throw new UnsupportedOperationException("intentional test failure"); + } + }); + Message message = MessageBuilder.withPayload("test") + .setHeader("delay", "10") + .setErrorChannel(errorChannel).build(); + input.send(message); + this.waitForLatch(1000); + Message errorMessage = resultHandler.lastMessage; + assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass()); + MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload(); + assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass()); + assertSame(message, exceptionPayload.getFailedMessage()); + assertNotSame(Thread.currentThread(), resultHandler.lastThread); + } + + @Test + public void errorChannelNameHeaderAndHandlerThrowsExceptionWithDelay() { + String errorChannelName = "customErrorChannel"; + StaticApplicationContext context = new StaticApplicationContext(); + context.registerSingleton(errorChannelName, DirectChannel.class); + context.refresh(); + DirectChannel customErrorChannel = (DirectChannel) context.getBean(errorChannelName); + DelayHandler delayHandler = new DelayHandler(0); + delayHandler.setBeanFactory(context); + delayHandler.setDelayHeaderName("delay"); + delayHandler.setOutputChannel(output); + ResultHandler resultHandler = new ResultHandler(); + customErrorChannel.subscribe(resultHandler); + input.subscribe(delayHandler); + output.subscribe(new MessageHandler() { + public void handleMessage(Message message) { + throw new UnsupportedOperationException("intentional test failure"); + } + }); + Message message = MessageBuilder.withPayload("test") + .setHeader("delay", "10") + .setErrorChannelName(errorChannelName).build(); + input.send(message); + this.waitForLatch(1000); + Message errorMessage = resultHandler.lastMessage; + assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass()); + MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload(); + assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass()); + assertSame(message, exceptionPayload.getFailedMessage()); + assertNotSame(Thread.currentThread(), resultHandler.lastThread); + } + + @Test + public void defaultErrorChannelAndHandlerThrowsExceptionWithDelay() { + StaticApplicationContext context = new StaticApplicationContext(); + context.registerSingleton( + IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, DirectChannel.class); + context.refresh(); + DirectChannel defaultErrorChannel = (DirectChannel) context.getBean( + IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME); + DelayHandler delayHandler = new DelayHandler(0); + delayHandler.setBeanFactory(context); + delayHandler.setDelayHeaderName("delay"); + delayHandler.setOutputChannel(output); + ResultHandler resultHandler = new ResultHandler(); + defaultErrorChannel.subscribe(resultHandler); + input.subscribe(delayHandler); + output.subscribe(new MessageHandler() { + public void handleMessage(Message message) { + throw new UnsupportedOperationException("intentional test failure"); + } + }); + Message message = MessageBuilder.withPayload("test") + .setHeader("delay", "10").build(); + input.send(message); + this.waitForLatch(1000); + Message errorMessage = resultHandler.lastMessage; + assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass()); + MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload(); + assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass()); + assertSame(message, exceptionPayload.getFailedMessage()); + assertNotSame(Thread.currentThread(), resultHandler.lastThread); + } + private void waitForLatch(long timeout) { try {