diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java index cf6052d939..e452efbd32 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java @@ -25,26 +25,23 @@ import org.apache.commons.logging.LogFactory; import org.springframework.context.Lifecycle; import org.springframework.integration.channel.ChannelInterceptor; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.selector.MessageSelector; import org.springframework.util.Assert; /** * A {@link ChannelInterceptor} that publishes a copy of the intercepted message - * to a secondary channel while still sending the original message to the main channel. + * to a secondary target while still sending the original message to the main channel. * * @author Mark Fisher */ public class WireTap extends ChannelInterceptorAdapter implements Lifecycle { - /** key for the attribute containing the original Message's id */ - public final static String ORIGINAL_MESSAGE_ID_KEY = "_wireTap.originalMessageId"; - - private final Log logger = LogFactory.getLog(this.getClass()); - private final MessageChannel secondaryChannel; + private final MessageTarget target; private final List selectors = new CopyOnWriteArrayList(); @@ -54,22 +51,22 @@ public class WireTap extends ChannelInterceptorAdapter implements Lifecycle { /** * Create a new wire tap with no {@link MessageSelector MessageSelectors}. * - * @param secondaryChannel the channel to which duplicate messages will be sent + * @param target the MessageTarget to which intercepted messages will be sent */ - public WireTap(MessageChannel secondaryChannel) { - Assert.notNull(secondaryChannel, "'secondaryChannel' must not be null"); - this.secondaryChannel = secondaryChannel; + public WireTap(MessageTarget target) { + Assert.notNull(target, "target must not be null"); + this.target = target; } /** * Create a new wire tap with {@link MessageSelector MessageSelectors}. * - * @param secondaryChannel the channel to which duplicate messages will be sent + * @param target the target to which intercepted messages will be sent * @param selectors the list of selectors that must accept a message for it to - * be sent to the secondary channel + * be sent to the intercepting target */ - public WireTap(MessageChannel secondaryChannel, List selectors) { - this(secondaryChannel); + public WireTap(MessageTarget target, List selectors) { + this(target); if (selectors != null) { this.selectors.addAll(selectors); } @@ -100,14 +97,10 @@ public class WireTap extends ChannelInterceptorAdapter implements Lifecycle { @Override public Message preSend(Message message, MessageChannel channel) { if (this.running && this.selectorsAccept(message)) { - Message duplicate = MessageBuilder.fromMessage(message) - .setHeader(ORIGINAL_MESSAGE_ID_KEY, message.getHeaders().getId()) - .build(); - if (!this.secondaryChannel.send(duplicate, 0)) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to send message to secondary channel '" + this.secondaryChannel.getName() - + "'. Check its capacity and whether it has any subscribers."); - } + boolean sent = (this.target instanceof BlockingTarget) ? + ((BlockingTarget) this.target).send(message, 0) : this.target.send(message); + if (!sent && logger.isWarnEnabled()) { + logger.warn("failed to send message to WireTap target '" + this.target + "'"); } } return message; @@ -116,7 +109,7 @@ public class WireTap extends ChannelInterceptorAdapter implements Lifecycle { /** * If this wire tap has any {@link MessageSelector MessageSelectors}, check * whether they accept the current message. If any of them do not accept it, - * the message will not be sent to the secondary channel. + * the message will not be sent to the intercepting target. */ private boolean selectorsAccept(Message message) { for (MessageSelector selector : this.selectors) { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/interceptor/WireTapTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/interceptor/WireTapTests.java index a73a4edbeb..c3eb8548ca 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/interceptor/WireTapTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/interceptor/WireTapTests.java @@ -17,10 +17,8 @@ package org.springframework.integration.channel.interceptor; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; @@ -30,6 +28,7 @@ import org.junit.Test; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.StringMessage; import org.springframework.integration.message.selector.MessageSelector; @@ -39,19 +38,20 @@ import org.springframework.integration.message.selector.MessageSelector; public class WireTapTests { @Test - public void testWireTapWithNoSelectors() { + public void wireTapWithNoSelectors() { QueueChannel mainChannel = new QueueChannel(); QueueChannel secondaryChannel = new QueueChannel(); mainChannel.addInterceptor(new WireTap(secondaryChannel)); mainChannel.send(new StringMessage("testing")); Message original = mainChannel.receive(0); assertNotNull(original); - Message duplicate = secondaryChannel.receive(0); - assertNotNull(duplicate); + Message intercepted = secondaryChannel.receive(0); + assertNotNull(intercepted); + assertEquals(original, intercepted); } @Test - public void testWireTapWithRejectingSelector() { + public void wireTapWithRejectingSelector() { QueueChannel mainChannel = new QueueChannel(); QueueChannel secondaryChannel = new QueueChannel(); List selectors = new ArrayList(); @@ -61,12 +61,12 @@ public class WireTapTests { mainChannel.send(new StringMessage("testing")); Message original = mainChannel.receive(0); assertNotNull(original); - Message duplicate = secondaryChannel.receive(0); - assertNull(duplicate); + Message intercepted = secondaryChannel.receive(0); + assertNull(intercepted); } @Test - public void testWireTapWithAcceptingSelectors() { + public void wireTapWithAcceptingSelectors() { QueueChannel mainChannel = new QueueChannel(); QueueChannel secondaryChannel = new QueueChannel(); List selectors = new ArrayList(); @@ -76,51 +76,33 @@ public class WireTapTests { mainChannel.send(new StringMessage("testing")); Message original = mainChannel.receive(0); assertNotNull(original); - Message duplicate = secondaryChannel.receive(0); - assertNotNull(duplicate); + Message intercepted = secondaryChannel.receive(0); + assertNotNull(intercepted); + assertEquals(original, intercepted); + } + + @Test(expected = IllegalArgumentException.class) + public void wireTapTargetMustNotBeNull() { + new WireTap(null); } @Test - public void testNewMessageIdGeneratedForDuplicate() { + public void simpleTargetWireTap() { QueueChannel mainChannel = new QueueChannel(); - QueueChannel secondaryChannel = new QueueChannel(); - mainChannel.addInterceptor(new WireTap(secondaryChannel)); - mainChannel.send(new StringMessage("testing")); - Message original = mainChannel.receive(0); - Message duplicate = secondaryChannel.receive(0); - Object duplicateId = duplicate.getHeaders().getId(); - assertNotNull(duplicateId); - assertFalse("message ids should not match", original.getHeaders().getId().equals(duplicateId)); - } - - @Test - public void testOriginalIdStoredAsAttribute() { - QueueChannel mainChannel = new QueueChannel(); - QueueChannel secondaryChannel = new QueueChannel(); - mainChannel.addInterceptor(new WireTap(secondaryChannel)); - mainChannel.send(new StringMessage("testing")); - Message original = mainChannel.receive(0); - Message duplicate = secondaryChannel.receive(0); - Object originalIdAttribute = duplicate.getHeaders().get(WireTap.ORIGINAL_MESSAGE_ID_KEY); - assertNotNull(originalIdAttribute); - assertEquals(original.getHeaders().getId(), originalIdAttribute); - } - - @Test - public void testNewTimestampGeneratedForDuplicate() throws InterruptedException { - QueueChannel mainChannel = new QueueChannel(); - QueueChannel secondaryChannel = new QueueChannel(); - mainChannel.addInterceptor(new WireTap(secondaryChannel)); + TestTarget secondaryTarget = new TestTarget(); + mainChannel.addInterceptor(new WireTap(secondaryTarget)); + assertNull(secondaryTarget.getLastMessage()); Message message = new StringMessage("testing"); - Thread.sleep(50); mainChannel.send(message); Message original = mainChannel.receive(0); - Message duplicate = secondaryChannel.receive(0); - assertTrue("original timestamp should precede duplicate", - original.getHeaders().getTimestamp() < duplicate.getHeaders().getTimestamp()); + Message intercepted = secondaryTarget.getLastMessage(); + assertNotNull(original); + assertNotNull(intercepted); + assertEquals(original, intercepted); } - public void testDuplicateMessageContainsHeaderValue() { + @Test + public void interceptedMessageContainsHeaderValue() { QueueChannel mainChannel = new QueueChannel(); QueueChannel secondaryChannel = new QueueChannel(); mainChannel.addInterceptor(new WireTap(secondaryChannel)); @@ -129,12 +111,12 @@ public class WireTapTests { .setHeader(headerName, new Integer(123)).build(); mainChannel.send(message); Message original = mainChannel.receive(0); - Message duplicate = secondaryChannel.receive(0); + Message intercepted = secondaryChannel.receive(0); Object originalAttribute = original.getHeaders().get(headerName); - Object duplicateAttribute = duplicate.getHeaders().get(headerName); + Object interceptedAttribute = intercepted.getHeaders().get(headerName); assertNotNull(originalAttribute); - assertNotNull(duplicateAttribute); - assertEquals(originalAttribute, duplicateAttribute); + assertNotNull(interceptedAttribute); + assertEquals(originalAttribute, interceptedAttribute); } @@ -151,4 +133,18 @@ public class WireTapTests { } } + private static class TestTarget implements MessageTarget { + + private volatile Message lastMessage; + + public boolean send(Message message) { + this.lastMessage = message; + return true; + } + + public Message getLastMessage() { + return this.lastMessage; + } + } + }