diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java
index a87ce23812..e0e419dfe2 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2008 the original author or authors.
+ * Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,11 +17,14 @@
package org.springframework.integration.router;
import java.util.Collection;
+import java.util.UUID;
import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.handler.AbstractMessageHandler;
+import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageDeliveryException;
/**
@@ -35,6 +38,10 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler {
private volatile boolean resolutionRequired;
+ private volatile boolean ignoreSendFailures;
+
+ private volatile boolean applySequence;
+
private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate();
@@ -66,16 +73,51 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler {
this.resolutionRequired = resolutionRequired;
}
+ /**
+ * Specify whether send failures for one or more of the recipients
+ * should be ignored. By default this is false meaning
+ * that an Exception will be thrown whenever a send fails. To override
+ * this and suppress Exceptions, set the value to true.
+ */
+ public void setIgnoreSendFailures(boolean ignoreSendFailures) {
+ this.ignoreSendFailures = ignoreSendFailures;
+ }
+
+ /**
+ * Specify whether to apply the sequence number and size headers to the
+ * messages prior to sending to the recipient channels. By default, this
+ * value is false meaning that sequence headers will
+ * not be applied. If planning to use an Aggregator downstream with
+ * the default correlation and completion strategies, you should set this
+ * flag to true.
+ */
+ public void setApplySequence(boolean applySequence) {
+ this.applySequence = applySequence;
+ }
+
@Override
protected void handleMessageInternal(Message> message) {
boolean sent = false;
Collection results = this.determineTargetChannels(message);
if (results != null) {
+ int sequenceSize = results.size();
+ int sequenceNumber = 1;
for (MessageChannel channel : results) {
+ final Message> messageToSend = (!this.applySequence) ? message
+ : MessageBuilder.fromMessage(message)
+ .setSequenceNumber(sequenceNumber++)
+ .setSequenceSize(sequenceSize)
+ .setCorrelationId(message.getHeaders().getId())
+ .setHeader(MessageHeaders.ID, UUID.randomUUID())
+ .build();
if (channel != null) {
- if (this.channelTemplate.send(message, channel)) {
+ if (this.channelTemplate.send(messageToSend, channel)) {
sent = true;
}
+ else if (!this.ignoreSendFailures) {
+ throw new MessageDeliveryException(message,
+ "Router failed to send to channel: " + channel);
+ }
}
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java
index f7cba58170..e79e057403 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2009 the original author or authors.
+ * Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,17 +17,12 @@
package org.springframework.integration.router;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import org.springframework.beans.factory.InitializingBean;
-import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
-import org.springframework.integration.core.MessageHeaders;
-import org.springframework.integration.handler.AbstractMessageHandler;
-import org.springframework.integration.message.MessageBuilder;
-import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.util.Assert;
/**
@@ -38,74 +33,22 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public class RecipientListRouter extends AbstractMessageHandler implements InitializingBean {
-
- private volatile boolean ignoreSendFailures;
-
- private volatile boolean applySequence;
+public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean {
private volatile List channels;
- private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate();
-
public void setChannels(List channels) {
this.channels = channels;
}
- /**
- * Set the timeout for sending a message to the resolved channel. By
- * default, there is no timeout, meaning the send will block indefinitely.
- */
- public void setTimeout(long timeout) {
- this.channelTemplate.setSendTimeout(timeout);
- }
-
- /**
- * Specify whether send failures for one or more of the recipients
- * should be ignored. By default this is false meaning
- * that an Exception will be thrown whenever a send fails. To override
- * this and suppress Exceptions, set the value to true.
- */
- public void setIgnoreSendFailures(boolean ignoreSendFailures) {
- this.ignoreSendFailures = ignoreSendFailures;
- }
-
- /**
- * Specify whether to apply the sequence number and size headers to the
- * messages prior to sending to the recipient channels. By default, this
- * value is false meaning that sequence headers will
- * not be applied. If planning to use an Aggregator downstream
- * with the default correlation and completion strategies, you should set
- * this flag to true.
- */
- public void setApplySequence(boolean applySequence) {
- this.applySequence = applySequence;
- }
-
public void afterPropertiesSet() {
Assert.notEmpty(this.channels, "a non-empty channel list is required");
}
@Override
- protected void handleMessageInternal(Message> message) throws Exception {
- List channelList = new ArrayList(this.channels);
- int sequenceSize = channelList.size();
- int sequenceNumber = 1;
- for (MessageChannel channel : channelList) {
- final Message> messageToSend = (!this.applySequence) ? message
- : MessageBuilder.fromMessage(message)
- .setSequenceNumber(sequenceNumber++)
- .setSequenceSize(sequenceSize)
- .setCorrelationId(message.getHeaders().getId())
- .setHeader(MessageHeaders.ID, UUID.randomUUID())
- .build();
- boolean sent = this.channelTemplate.send(messageToSend, channel);
- if (!sent && !this.ignoreSendFailures) {
- throw new MessageDeliveryException(message,
- "RecipientListRouter failed to send to channel: " + channel);
- }
- }
+ protected Collection determineTargetChannels(Message> message) {
+ return new ArrayList(this.channels);
}
}