INT-734 Added an 'ignore-failures' flag for 'publish-subscribe-channel' (the default it FALSE).

This commit is contained in:
Mark Fisher
2009-07-17 21:26:38 +00:00
parent 51a0104172
commit d52086b934
6 changed files with 127 additions and 10 deletions

View File

@@ -36,6 +36,8 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme
private volatile ErrorHandler errorHandler;
private volatile boolean ignoreFailures;
private volatile boolean applySequence;
@@ -62,6 +64,25 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme
this.errorHandler = errorHandler;
}
/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is <code>false</code> meaning that an Exception
* will be thrown whenever a handler fails. To override this and suppress
* Exceptions, set the value to <code>true</code>.
*/
public void setIgnoreFailures(boolean ignoreFailures) {
this.ignoreFailures = ignoreFailures;
this.getDispatcher().setIgnoreFailures(ignoreFailures);
}
/**
* Specify whether to apply the sequence number and size headers to the
* messages prior to invoking the subscribed handlers. By default, this
* value is <code>false</code> meaning that sequence headers will
* <em>not</em> be applied. If planning to use an Aggregator downstream
* with the default correlation and completion strategies, you should set
* this flag to <code>true</code>.
*/
public void setApplySequence(boolean applySequence) {
this.applySequence = applySequence;
this.getDispatcher().setApplySequence(applySequence);
@@ -76,6 +97,7 @@ public class PublishSubscribeChannel extends AbstractSubscribableChannel impleme
this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler);
}
this.dispatcher = new BroadcastingDispatcher(this.taskExecutor);
this.dispatcher.setIgnoreFailures(this.ignoreFailures);
this.dispatcher.setApplySequence(this.applySequence);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ public class PublishSubscribeChannelParser extends AbstractChannelParser {
builder.addConstructorArgReference(taskExecutorRef);
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "error-handler");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "ignore-failures");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "apply-sequence");
return builder;
}

View File

@@ -26,16 +26,26 @@ import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageHandler;
/**
* A broadcasting dispatcher implementation. It makes a best effort to
* send the message to each of its handlers. If it fails to send to any
* one handler, it will log a warn-level message but continue to send
* to the other handlers.
* A broadcasting dispatcher implementation. If the 'ignoreFailures' property
* is set to <code>false</code> (the default), it will fail fast such that any
* Exception thrown by a MessageHandler may prevent subsequent handlers from
* receiving the Message. However, when a TaskExecutor is provided, the Messages
* may be dispatched in separate Threads so that other handlers are invoked even
* when the 'ignoreFailures' flag is <code>false</code>.
* <p>
* If the 'ignoreFailures' flag is set to <code>true</code> on the other hand,
* it will make a best effort to send the message to each of its handlers. In
* other words, when 'ignoreFailures' is <code>true</code>, if it fails to send
* to any one handler, it will simply log a warn-level message but continue to
* send the Message to any other handlers.
*
* @author Mark Fisher
* @author Iwein Fuld
*/
public class BroadcastingDispatcher extends AbstractDispatcher {
private volatile boolean ignoreFailures;
private volatile boolean applySequence;
private final TaskExecutor taskExecutor;
@@ -50,6 +60,22 @@ public class BroadcastingDispatcher extends AbstractDispatcher {
}
/**
* Specify whether failures for one or more of the handlers should be
* ignored. By default this is <code>false</code> meaning that an
* Exception will be thrown when a handler fails. To override this and
* suppress Exceptions, set the value to <code>true</code>.
* <p>
* Keep in mind that when using a TaskExecutor, even without ignoring the
* failures, other handlers may be invoked after one throws an Exception.
* Since the TaskExecutor is using a different thread, this flag will only
* affect whether an error Message is sent to the error channel or not in
* the case that a TaskExecutor has been configured.
*/
public void setIgnoreFailures(boolean ignoreFailures) {
this.ignoreFailures = ignoreFailures;
}
/**
* Specify whether to apply sequence numbers to the messages
* prior to sending to the handlers. By default, sequence
@@ -59,8 +85,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher {
this.applySequence = applySequence;
}
public boolean dispatch(Message<?> message) {
boolean dispatched = false;
int sequenceNumber = 1;
List<MessageHandler> handlers = this.getHandlers();
int sequenceSize = handlers.size();
@@ -75,15 +101,33 @@ public class BroadcastingDispatcher extends AbstractDispatcher {
if (this.taskExecutor != null) {
this.taskExecutor.execute(new Runnable() {
public void run() {
handler.handleMessage(messageToSend);
invokeHandler(handler, messageToSend);
}
});
dispatched = true;
}
else {
handler.handleMessage(messageToSend);
boolean success = this.invokeHandler(handler, messageToSend);
dispatched = (success || dispatched);
}
}
return true;
return dispatched;
}
private boolean invokeHandler(MessageHandler handler, Message<?> message) {
try {
handler.handleMessage(message);
return true;
}
catch (RuntimeException e) {
if (!this.ignoreFailures) {
throw e;
}
else if (this.logger.isWarnEnabled()) {
logger.warn("Suppressing Exception since 'ignoreFailures' is set to TRUE.", e);
}
return false;
}
}
}

View File

@@ -225,7 +225,21 @@
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="apply-sequence" type="xsd:string" />
<xsd:attribute name="ignore-failures" type="xsd:string" default="false">
<xsd:annotation>
<xsd:documentation>
Specify whether Exceptions thrown by any subscribed handler should be ignored (only logged).
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="apply-sequence" type="xsd:string" default="false">
<xsd:annotation>
<xsd:documentation>
Specify whether the sequence size, sequence number, and correlation id headers should be set on
Messages that are sent through this channel.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>

View File

@@ -48,9 +48,22 @@ public class PublishSubscribeChannelParserTests {
accessor.getPropertyValue("dispatcher");
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
assertNull(dispatcherAccessor.getPropertyValue("taskExecutor"));
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("applySequence"));
}
@Test
public void ignoreFailures() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailures");
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures"));
}
@Test
public void applySequenceEnabled() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
@@ -81,6 +94,25 @@ public class PublishSubscribeChannelParserTests {
assertEquals(context.getBean("pool"), innerExecutor);
}
@Test
public void ignoreFailuresWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"publishSubscribeChannelParserTests.xml", this.getClass());
PublishSubscribeChannel channel = (PublishSubscribeChannel)
context.getBean("channelWithIgnoreFailuresAndTaskExecutor");
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
accessor.getPropertyValue("dispatcher");
DirectFieldAccessor dispatcherAccessor = new DirectFieldAccessor(dispatcher);
assertTrue((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
TaskExecutor executor = (TaskExecutor) dispatcherAccessor.getPropertyValue("taskExecutor");
assertNotNull(executor);
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor");
assertEquals(context.getBean("pool"), innerExecutor);
}
@Test
public void applySequenceEnabledWithTaskExecutor() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(

View File

@@ -9,10 +9,14 @@
<publish-subscribe-channel id="defaultChannel"/>
<publish-subscribe-channel id="channelWithIgnoreFailures" ignore-failures="true"/>
<publish-subscribe-channel id="channelWithApplySequenceEnabled" apply-sequence="true"/>
<publish-subscribe-channel id="channelWithTaskExecutor" task-executor="pool"/>
<publish-subscribe-channel id="channelWithIgnoreFailuresAndTaskExecutor" ignore-failures="true" task-executor="pool"/>
<publish-subscribe-channel id="channelWithApplySequenceEnabledAndTaskExecutor" apply-sequence="true" task-executor="pool"/>
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>