diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index e07a9a0ee6..786464fdb5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -36,6 +36,8 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme private volatile ErrorHandler errorHandler; + private volatile boolean ignoreFailures; + private volatile boolean applySequence; @@ -62,6 +64,25 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme this.errorHandler = errorHandler; } + /** + * Specify whether failures for one or more of the handlers should be + * ignored. By default this is false meaning that an Exception + * will be thrown whenever a handler fails. To override this and suppress + * Exceptions, set the value to true. + */ + public void setIgnoreFailures(boolean ignoreFailures) { + this.ignoreFailures = ignoreFailures; + this.getDispatcher().setIgnoreFailures(ignoreFailures); + } + + /** + * Specify whether to apply the sequence number and size headers to the + * messages prior to invoking the subscribed handlers. By default, this + * value is false meaning that sequence headers will + * not be applied. If planning to use an Aggregator downstream + * with the default correlation and completion strategies, you should set + * this flag to true. + */ public void setApplySequence(boolean applySequence) { this.applySequence = applySequence; this.getDispatcher().setApplySequence(applySequence); @@ -76,6 +97,7 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler); } this.dispatcher = new BroadcastingDispatcher(this.taskExecutor); + this.dispatcher.setIgnoreFailures(this.ignoreFailures); this.dispatcher.setApplySequence(this.applySequence); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PublishSubscribeChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PublishSubscribeChannelParser.java index f2a18be536..754e8b160a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PublishSubscribeChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PublishSubscribeChannelParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2009 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,6 +38,7 @@ public class PublishSubscribeChannelParser extends AbstractChannelParser { builder.addConstructorArgReference(taskExecutorRef); } IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-handler"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ignore-failures"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence"); return builder; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index c129f470cb..12752b7c17 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -26,16 +26,26 @@ import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageHandler; /** - * A broadcasting dispatcher implementation. It makes a best effort to - * send the message to each of its handlers. If it fails to send to any - * one handler, it will log a warn-level message but continue to send - * to the other handlers. + * A broadcasting dispatcher implementation. If the 'ignoreFailures' property + * is set to false (the default), it will fail fast such that any + * Exception thrown by a MessageHandler may prevent subsequent handlers from + * receiving the Message. However, when a TaskExecutor is provided, the Messages + * may be dispatched in separate Threads so that other handlers are invoked even + * when the 'ignoreFailures' flag is false. + *

+ * If the 'ignoreFailures' flag is set to true on the other hand, + * it will make a best effort to send the message to each of its handlers. In + * other words, when 'ignoreFailures' is true, if it fails to send + * to any one handler, it will simply log a warn-level message but continue to + * send the Message to any other handlers. * * @author Mark Fisher * @author Iwein Fuld */ public class BroadcastingDispatcher extends AbstractDispatcher { + private volatile boolean ignoreFailures; + private volatile boolean applySequence; private final TaskExecutor taskExecutor; @@ -50,6 +60,22 @@ public class BroadcastingDispatcher extends AbstractDispatcher { } + /** + * Specify whether failures for one or more of the handlers should be + * ignored. By default this is false meaning that an + * Exception will be thrown when a handler fails. To override this and + * suppress Exceptions, set the value to true. + *

+ * Keep in mind that when using a TaskExecutor, even without ignoring the + * failures, other handlers may be invoked after one throws an Exception. + * Since the TaskExecutor is using a different thread, this flag will only + * affect whether an error Message is sent to the error channel or not in + * the case that a TaskExecutor has been configured. + */ + public void setIgnoreFailures(boolean ignoreFailures) { + this.ignoreFailures = ignoreFailures; + } + /** * Specify whether to apply sequence numbers to the messages * prior to sending to the handlers. By default, sequence @@ -59,8 +85,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher { this.applySequence = applySequence; } - public boolean dispatch(Message message) { + boolean dispatched = false; int sequenceNumber = 1; List handlers = this.getHandlers(); int sequenceSize = handlers.size(); @@ -75,15 +101,33 @@ public class BroadcastingDispatcher extends AbstractDispatcher { if (this.taskExecutor != null) { this.taskExecutor.execute(new Runnable() { public void run() { - handler.handleMessage(messageToSend); + invokeHandler(handler, messageToSend); } }); + dispatched = true; } else { - handler.handleMessage(messageToSend); + boolean success = this.invokeHandler(handler, messageToSend); + dispatched = (success || dispatched); } } - return true; + return dispatched; + } + + private boolean invokeHandler(MessageHandler handler, Message message) { + try { + handler.handleMessage(message); + return true; + } + catch (RuntimeException e) { + if (!this.ignoreFailures) { + throw e; + } + else if (this.logger.isWarnEnabled()) { + logger.warn("Suppressing Exception since 'ignoreFailures' is set to TRUE.", e); + } + return false; + } } } diff --git a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd index ccd6a5c825..f2ec4aaa91 100644 --- a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -225,7 +225,21 @@ - + + + + Specify whether Exceptions thrown by any subscribed handler should be ignored (only logged). + + + + + + + Specify whether the sequence size, sequence number, and correlation id headers should be set on + Messages that are sent through this channel. + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PublishSubscribeChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PublishSubscribeChannelParserTests.java index 9b2a80c6e4..aa09fb52df 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/PublishSubscribeChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PublishSubscribeChannelParserTests.java @@ -48,9 +48,22 @@ public class PublishSubscribeChannelParserTests { accessor.getPropertyValue("dispatcher"); DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher); assertNull(dispatcherAccessor.getPropertyValue("taskExecutor")); + assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures")); assertFalse((Boolean) dispatcherAccessor.getPropertyValue("applySequence")); } + @Test + public void ignoreFailures() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "publishSubscribeChannelParserTests.xml", this.getClass()); + PublishSubscribeChannel channel = (PublishSubscribeChannel) + context.getBean("channelWithIgnoreFailures"); + DirectFieldAccessor accessor = new DirectFieldAccessor(channel); + BroadcastingDispatcher dispatcher = (BroadcastingDispatcher) + accessor.getPropertyValue("dispatcher"); + assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures")); + } + @Test public void applySequenceEnabled() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( @@ -81,6 +94,25 @@ public class PublishSubscribeChannelParserTests { assertEquals(context.getBean("pool"), innerExecutor); } + @Test + public void ignoreFailuresWithTaskExecutor() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "publishSubscribeChannelParserTests.xml", this.getClass()); + PublishSubscribeChannel channel = (PublishSubscribeChannel) + context.getBean("channelWithIgnoreFailuresAndTaskExecutor"); + DirectFieldAccessor accessor = new DirectFieldAccessor(channel); + BroadcastingDispatcher dispatcher = (BroadcastingDispatcher) + accessor.getPropertyValue("dispatcher"); + DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher); + assertTrue((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures")); + TaskExecutor executor = (TaskExecutor) dispatcherAccessor.getPropertyValue("taskExecutor"); + assertNotNull(executor); + assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass()); + DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor); + TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor"); + assertEquals(context.getBean("pool"), innerExecutor); + } + @Test public void applySequenceEnabledWithTaskExecutor() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/publishSubscribeChannelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/publishSubscribeChannelParserTests.xml index 4ab0788254..7a8808eb84 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/publishSubscribeChannelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/publishSubscribeChannelParserTests.xml @@ -9,10 +9,14 @@ + + + +