INT-745 RecipientListRouter extends AbstractMessageRouter again. The applySequence boolean has moved up to the base class.

This commit is contained in:
Mark Fisher
2010-04-14 22:18:13 +00:00
parent 22193b378e
commit 0c47f9a0b4
2 changed files with 49 additions and 64 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,11 +17,14 @@
package org.springframework.integration.router;
import java.util.Collection;
import java.util.UUID;
import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageDeliveryException;
/**
@@ -35,6 +38,10 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler {
private volatile boolean resolutionRequired;
private volatile boolean ignoreSendFailures;
private volatile boolean applySequence;
private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate();
@@ -66,16 +73,51 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler {
this.resolutionRequired = resolutionRequired;
}
/**
* Specify whether send failures for one or more of the recipients
* should be ignored. By default this is <code>false</code> meaning
* that an Exception will be thrown whenever a send fails. To override
* this and suppress Exceptions, set the value to <code>true</code>.
*/
public void setIgnoreSendFailures(boolean ignoreSendFailures) {
this.ignoreSendFailures = ignoreSendFailures;
}
/**
* Specify whether to apply the sequence number and size headers to the
* messages prior to sending to the recipient channels. By default, this
* value is <code>false</code> meaning that sequence headers will
* <em>not</em> be applied. If planning to use an Aggregator downstream with
* the default correlation and completion strategies, you should set this
* flag to <code>true</code>.
*/
public void setApplySequence(boolean applySequence) {
this.applySequence = applySequence;
}
@Override
protected void handleMessageInternal(Message<?> message) {
boolean sent = false;
Collection<MessageChannel> results = this.determineTargetChannels(message);
if (results != null) {
int sequenceSize = results.size();
int sequenceNumber = 1;
for (MessageChannel channel : results) {
final Message<?> messageToSend = (!this.applySequence) ? message
: MessageBuilder.fromMessage(message)
.setSequenceNumber(sequenceNumber++)
.setSequenceSize(sequenceSize)
.setCorrelationId(message.getHeaders().getId())
.setHeader(MessageHeaders.ID, UUID.randomUUID())
.build();
if (channel != null) {
if (this.channelTemplate.send(message, channel)) {
if (this.channelTemplate.send(messageToSend, channel)) {
sent = true;
}
else if (!this.ignoreSendFailures) {
throw new MessageDeliveryException(message,
"Router failed to send to channel: " + channel);
}
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,17 +17,12 @@
package org.springframework.integration.router;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.util.Assert;
/**
@@ -38,74 +33,22 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
public class RecipientListRouter extends AbstractMessageHandler implements InitializingBean {
private volatile boolean ignoreSendFailures;
private volatile boolean applySequence;
public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean {
private volatile List<MessageChannel> channels;
private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate();
public void setChannels(List<MessageChannel> channels) {
this.channels = channels;
}
/**
* Set the timeout for sending a message to the resolved channel. By
* default, there is no timeout, meaning the send will block indefinitely.
*/
public void setTimeout(long timeout) {
this.channelTemplate.setSendTimeout(timeout);
}
/**
* Specify whether send failures for one or more of the recipients
* should be ignored. By default this is <code>false</code> meaning
* that an Exception will be thrown whenever a send fails. To override
* this and suppress Exceptions, set the value to <code>true</code>.
*/
public void setIgnoreSendFailures(boolean ignoreSendFailures) {
this.ignoreSendFailures = ignoreSendFailures;
}
/**
* Specify whether to apply the sequence number and size headers to the
* messages prior to sending to the recipient channels. By default, this
* value is <code>false</code> meaning that sequence headers will
* <em>not</em> be applied. If planning to use an Aggregator downstream
* with the default correlation and completion strategies, you should set
* this flag to <code>true</code>.
*/
public void setApplySequence(boolean applySequence) {
this.applySequence = applySequence;
}
public void afterPropertiesSet() {
Assert.notEmpty(this.channels, "a non-empty channel list is required");
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
List<MessageChannel> channelList = new ArrayList<MessageChannel>(this.channels);
int sequenceSize = channelList.size();
int sequenceNumber = 1;
for (MessageChannel channel : channelList) {
final Message<?> messageToSend = (!this.applySequence) ? message
: MessageBuilder.fromMessage(message)
.setSequenceNumber(sequenceNumber++)
.setSequenceSize(sequenceSize)
.setCorrelationId(message.getHeaders().getId())
.setHeader(MessageHeaders.ID, UUID.randomUUID())
.build();
boolean sent = this.channelTemplate.send(messageToSend, channel);
if (!sent && !this.ignoreSendFailures) {
throw new MessageDeliveryException(message,
"RecipientListRouter failed to send to channel: " + channel);
}
}
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
return new ArrayList<MessageChannel>(this.channels);
}
}