From 0c47f9a0b45b95bc30278aaa31bdfacf6db3f6a1 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Wed, 14 Apr 2010 22:18:13 +0000 Subject: [PATCH] INT-745 RecipientListRouter extends AbstractMessageRouter again. The applySequence boolean has moved up to the base class. --- .../router/AbstractMessageRouter.java | 46 ++++++++++++- .../router/RecipientListRouter.java | 67 ++----------------- 2 files changed, 49 insertions(+), 64 deletions(-) diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java index a87ce23812..e0e419dfe2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2008 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,14 @@ package org.springframework.integration.router; import java.util.Collection; +import java.util.UUID; import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.core.MessageHeaders; import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageDeliveryException; /** @@ -35,6 +38,10 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler { private volatile boolean resolutionRequired; + private volatile boolean ignoreSendFailures; + + private volatile boolean applySequence; + private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); @@ -66,16 +73,51 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler { this.resolutionRequired = resolutionRequired; } + /** + * Specify whether send failures for one or more of the recipients + * should be ignored. By default this is false meaning + * that an Exception will be thrown whenever a send fails. To override + * this and suppress Exceptions, set the value to true. + */ + public void setIgnoreSendFailures(boolean ignoreSendFailures) { + this.ignoreSendFailures = ignoreSendFailures; + } + + /** + * Specify whether to apply the sequence number and size headers to the + * messages prior to sending to the recipient channels. By default, this + * value is false meaning that sequence headers will + * not be applied. If planning to use an Aggregator downstream with + * the default correlation and completion strategies, you should set this + * flag to true. + */ + public void setApplySequence(boolean applySequence) { + this.applySequence = applySequence; + } + @Override protected void handleMessageInternal(Message message) { boolean sent = false; Collection results = this.determineTargetChannels(message); if (results != null) { + int sequenceSize = results.size(); + int sequenceNumber = 1; for (MessageChannel channel : results) { + final Message messageToSend = (!this.applySequence) ? message + : MessageBuilder.fromMessage(message) + .setSequenceNumber(sequenceNumber++) + .setSequenceSize(sequenceSize) + .setCorrelationId(message.getHeaders().getId()) + .setHeader(MessageHeaders.ID, UUID.randomUUID()) + .build(); if (channel != null) { - if (this.channelTemplate.send(message, channel)) { + if (this.channelTemplate.send(messageToSend, channel)) { sent = true; } + else if (!this.ignoreSendFailures) { + throw new MessageDeliveryException(message, + "Router failed to send to channel: " + channel); + } } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java index f7cba58170..e79e057403 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RecipientListRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2009 the original author or authors. + * Copyright 2002-2010 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,17 +17,12 @@ package org.springframework.integration.router; import java.util.ArrayList; +import java.util.Collection; import java.util.List; -import java.util.UUID; import org.springframework.beans.factory.InitializingBean; -import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; -import org.springframework.integration.core.MessageHeaders; -import org.springframework.integration.handler.AbstractMessageHandler; -import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageDeliveryException; import org.springframework.util.Assert; /** @@ -38,74 +33,22 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class RecipientListRouter extends AbstractMessageHandler implements InitializingBean { - - private volatile boolean ignoreSendFailures; - - private volatile boolean applySequence; +public class RecipientListRouter extends AbstractMessageRouter implements InitializingBean { private volatile List channels; - private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); - public void setChannels(List channels) { this.channels = channels; } - /** - * Set the timeout for sending a message to the resolved channel. By - * default, there is no timeout, meaning the send will block indefinitely. - */ - public void setTimeout(long timeout) { - this.channelTemplate.setSendTimeout(timeout); - } - - /** - * Specify whether send failures for one or more of the recipients - * should be ignored. By default this is false meaning - * that an Exception will be thrown whenever a send fails. To override - * this and suppress Exceptions, set the value to true. - */ - public void setIgnoreSendFailures(boolean ignoreSendFailures) { - this.ignoreSendFailures = ignoreSendFailures; - } - - /** - * Specify whether to apply the sequence number and size headers to the - * messages prior to sending to the recipient channels. By default, this - * value is false meaning that sequence headers will - * not be applied. If planning to use an Aggregator downstream - * with the default correlation and completion strategies, you should set - * this flag to true. - */ - public void setApplySequence(boolean applySequence) { - this.applySequence = applySequence; - } - public void afterPropertiesSet() { Assert.notEmpty(this.channels, "a non-empty channel list is required"); } @Override - protected void handleMessageInternal(Message message) throws Exception { - List channelList = new ArrayList(this.channels); - int sequenceSize = channelList.size(); - int sequenceNumber = 1; - for (MessageChannel channel : channelList) { - final Message messageToSend = (!this.applySequence) ? message - : MessageBuilder.fromMessage(message) - .setSequenceNumber(sequenceNumber++) - .setSequenceSize(sequenceSize) - .setCorrelationId(message.getHeaders().getId()) - .setHeader(MessageHeaders.ID, UUID.randomUUID()) - .build(); - boolean sent = this.channelTemplate.send(messageToSend, channel); - if (!sent && !this.ignoreSendFailures) { - throw new MessageDeliveryException(message, - "RecipientListRouter failed to send to channel: " + channel); - } - } + protected Collection determineTargetChannels(Message message) { + return new ArrayList(this.channels); } }