INT-735 Added ErrorMessage publishing to the DelayHandler for any Exceptions that occur during delayed Message deliveries.

This commit is contained in:
Mark Fisher
2009-07-18 16:13:58 +00:00
parent e929b841d3
commit ad0890c4f6
3 changed files with 186 additions and 27 deletions

View File

@@ -30,7 +30,7 @@ import org.springframework.util.Assert;
*/
public class MapBasedChannelResolver implements ChannelResolver {
private volatile Map<String, MessageChannel> channelMap = new HashMap<String, MessageChannel>();
private volatile Map<String, ? extends MessageChannel> channelMap = new HashMap<String, MessageChannel>();
/**
* Empty constructor for use when providing the channel map via
@@ -43,7 +43,7 @@ public class MapBasedChannelResolver implements ChannelResolver {
* Create a {@link ChannelResolver} that uses the provided Map.
* Each String key will resolve to the associated channel value.
*/
public MapBasedChannelResolver(Map<String, MessageChannel> channelMap) {
public MapBasedChannelResolver(Map<String, ? extends MessageChannel> channelMap) {
this.setChannelMap(channelMap);
}
@@ -51,7 +51,7 @@ public class MapBasedChannelResolver implements ChannelResolver {
* Provide a map of channels to be used by this resolver.
* Each String key will resolve to the associated channel value.
*/
public void setChannelMap(Map<String, MessageChannel> channelMap) {
public void setChannelMap(Map<String, ? extends MessageChannel> channelMap) {
Assert.notNull(channelMap, "channelMap must not be null");
this.channelMap = channelMap;
}

View File

@@ -33,8 +33,12 @@ import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolutionException;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.util.Assert;
@@ -164,6 +168,17 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware,
}
public final void handleMessage(final Message<?> message) {
long delay = this.determineDelayForMessage(message);
if (delay > 0) {
this.releaseMessageAfterDelay(message, delay);
}
else {
// no delay, release directly
this.releaseMessage(message);
}
}
private long determineDelayForMessage(Message<?> message) {
long delay = this.defaultDelay;
if (this.delayHeaderName != null) {
Object headerValue = message.getHeaders().get(this.delayHeaderName);
@@ -182,17 +197,31 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware,
}
}
}
if (delay > 0) {
this.scheduler.schedule(new Runnable() {
public void run() {
return delay;
}
private void releaseMessageAfterDelay(final Message<?> message, long delay) {
this.scheduler.schedule(new Runnable() {
public void run() {
try {
releaseMessage(message);
}
}, delay, TimeUnit.MILLISECONDS);
}
else {
// no delay, release directly
this.releaseMessage(message);
}
catch (Exception e) {
Exception exception = new MessageDeliveryException(message, "Failed to deliver Message after delay.", e);
MessageChannel errorChannel = resolveErrorChannelIfPossible(message);
if (errorChannel != null) {
ErrorMessage errorMessage = new ErrorMessage(exception);
boolean sent = channelTemplate.send(errorMessage, errorChannel);
if (!sent && logger.isWarnEnabled()) {
logger.warn("Failed to send MessageDeliveryException to error channel.", exception);
}
}
else if (logger.isWarnEnabled()) {
logger.warn("No error channel available. MessageDeliveryException will be ignored.", exception);
}
}
}
}, delay, TimeUnit.MILLISECONDS);
}
private void releaseMessage(Message<?> message) {
@@ -203,21 +232,7 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware,
private MessageChannel resolveReplyChannel(Message<?> message) {
MessageChannel replyChannel = this.outputChannel;
if (replyChannel == null) {
Object replyChannelHeader= message.getHeaders().getReplyChannel();
if (replyChannelHeader != null) {
if (replyChannelHeader instanceof MessageChannel) {
replyChannel = (MessageChannel) replyChannelHeader;
}
else if (replyChannelHeader instanceof String) {
Assert.state(this.channelResolver != null,
"ChannelResolver is required for resolving a reply channel by name");
replyChannel = this.channelResolver.resolveChannelName((String) replyChannelHeader);
}
else {
throw new ChannelResolutionException("expected a MessageChannel or String for 'replyChannel', but type is ["
+ replyChannelHeader.getClass() + "]");
}
}
replyChannel = this.resolveChannelFromHeader(message, MessageHeaders.REPLY_CHANNEL);
}
if (replyChannel == null) {
throw new ChannelResolutionException(
@@ -226,6 +241,42 @@ public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware,
return replyChannel;
}
private MessageChannel resolveErrorChannelIfPossible(Message<?> message) {
MessageChannel errorChannel = null;
try {
errorChannel = this.resolveChannelFromHeader(message, MessageHeaders.ERROR_CHANNEL);
}
catch (Exception e) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to resolve error channel from header.", e);
}
}
if (errorChannel == null && this.channelResolver != null) {
errorChannel = this.channelResolver.resolveChannelName(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
}
return errorChannel;
}
private MessageChannel resolveChannelFromHeader(Message<?> message, String headerName) {
MessageChannel channel = null;
Object channelHeader = message.getHeaders().get(headerName);
if (channelHeader != null) {
if (channelHeader instanceof MessageChannel) {
channel = (MessageChannel) channelHeader;
}
else if (channelHeader instanceof String) {
Assert.state(this.channelResolver != null,
"ChannelResolver is required for resolving '" + headerName + "' by name.");
channel = this.channelResolver.resolveChannelName((String) channelHeader);
}
else {
throw new ChannelResolutionException("expected a MessageChannel or String for '" +
headerName + "', but type is [" + channelHeader.getClass() + "]");
}
}
return channel;
}
public void destroy() {
if (this.waitForTasksToCompleteOnShutdown) {
this.scheduler.shutdown();

View File

@@ -28,9 +28,12 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.StringMessage;
@@ -251,6 +254,111 @@ public class DelayHandlerTests {
assertEquals(1, latch.getCount());
}
@Test(expected = UnsupportedOperationException.class)
public void handlerThrowsExceptionWithNoDelay() {
DelayHandler delayHandler = new DelayHandler(0);
delayHandler.setOutputChannel(output);
input.subscribe(delayHandler);
output.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
throw new UnsupportedOperationException("intentional test failure");
}
});
Message<?> message = MessageBuilder.withPayload("test").build();
input.send(message);
}
@Test
public void errorChannelHeaderAndHandlerThrowsExceptionWithDelay() {
DelayHandler delayHandler = new DelayHandler(0);
delayHandler.setDelayHeaderName("delay");
delayHandler.setOutputChannel(output);
DirectChannel errorChannel = new DirectChannel();
ResultHandler resultHandler = new ResultHandler();
errorChannel.subscribe(resultHandler);
input.subscribe(delayHandler);
output.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
throw new UnsupportedOperationException("intentional test failure");
}
});
Message<?> message = MessageBuilder.withPayload("test")
.setHeader("delay", "10")
.setErrorChannel(errorChannel).build();
input.send(message);
this.waitForLatch(1000);
Message<?> errorMessage = resultHandler.lastMessage;
assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass());
MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload();
assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass());
assertSame(message, exceptionPayload.getFailedMessage());
assertNotSame(Thread.currentThread(), resultHandler.lastThread);
}
@Test
public void errorChannelNameHeaderAndHandlerThrowsExceptionWithDelay() {
String errorChannelName = "customErrorChannel";
StaticApplicationContext context = new StaticApplicationContext();
context.registerSingleton(errorChannelName, DirectChannel.class);
context.refresh();
DirectChannel customErrorChannel = (DirectChannel) context.getBean(errorChannelName);
DelayHandler delayHandler = new DelayHandler(0);
delayHandler.setBeanFactory(context);
delayHandler.setDelayHeaderName("delay");
delayHandler.setOutputChannel(output);
ResultHandler resultHandler = new ResultHandler();
customErrorChannel.subscribe(resultHandler);
input.subscribe(delayHandler);
output.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
throw new UnsupportedOperationException("intentional test failure");
}
});
Message<?> message = MessageBuilder.withPayload("test")
.setHeader("delay", "10")
.setErrorChannelName(errorChannelName).build();
input.send(message);
this.waitForLatch(1000);
Message<?> errorMessage = resultHandler.lastMessage;
assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass());
MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload();
assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass());
assertSame(message, exceptionPayload.getFailedMessage());
assertNotSame(Thread.currentThread(), resultHandler.lastThread);
}
@Test
public void defaultErrorChannelAndHandlerThrowsExceptionWithDelay() {
StaticApplicationContext context = new StaticApplicationContext();
context.registerSingleton(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, DirectChannel.class);
context.refresh();
DirectChannel defaultErrorChannel = (DirectChannel) context.getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
DelayHandler delayHandler = new DelayHandler(0);
delayHandler.setBeanFactory(context);
delayHandler.setDelayHeaderName("delay");
delayHandler.setOutputChannel(output);
ResultHandler resultHandler = new ResultHandler();
defaultErrorChannel.subscribe(resultHandler);
input.subscribe(delayHandler);
output.subscribe(new MessageHandler() {
public void handleMessage(Message<?> message) {
throw new UnsupportedOperationException("intentional test failure");
}
});
Message<?> message = MessageBuilder.withPayload("test")
.setHeader("delay", "10").build();
input.send(message);
this.waitForLatch(1000);
Message<?> errorMessage = resultHandler.lastMessage;
assertEquals(MessageDeliveryException.class, errorMessage.getPayload().getClass());
MessageDeliveryException exceptionPayload = (MessageDeliveryException) errorMessage.getPayload();
assertEquals(UnsupportedOperationException.class, exceptionPayload.getCause().getClass());
assertSame(message, exceptionPayload.getFailedMessage());
assertNotSame(Thread.currentThread(), resultHandler.lastThread);
}
private void waitForLatch(long timeout) {
try {