WireTap now accepts any MessageTarget for sending intercepted Messages instead of requiring a MessageChannel.
This commit is contained in:
@@ -17,10 +17,8 @@
|
||||
package org.springframework.integration.channel.interceptor;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -30,6 +28,7 @@ import org.junit.Test;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageTarget;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.message.selector.MessageSelector;
|
||||
|
||||
@@ -39,19 +38,20 @@ import org.springframework.integration.message.selector.MessageSelector;
|
||||
public class WireTapTests {
|
||||
|
||||
@Test
|
||||
public void testWireTapWithNoSelectors() {
|
||||
public void wireTapWithNoSelectors() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryChannel));
|
||||
mainChannel.send(new StringMessage("testing"));
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
assertNotNull(original);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
assertNotNull(duplicate);
|
||||
Message<?> intercepted = secondaryChannel.receive(0);
|
||||
assertNotNull(intercepted);
|
||||
assertEquals(original, intercepted);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWireTapWithRejectingSelector() {
|
||||
public void wireTapWithRejectingSelector() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
List<MessageSelector> selectors = new ArrayList<MessageSelector>();
|
||||
@@ -61,12 +61,12 @@ public class WireTapTests {
|
||||
mainChannel.send(new StringMessage("testing"));
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
assertNotNull(original);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
assertNull(duplicate);
|
||||
Message<?> intercepted = secondaryChannel.receive(0);
|
||||
assertNull(intercepted);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWireTapWithAcceptingSelectors() {
|
||||
public void wireTapWithAcceptingSelectors() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
List<MessageSelector> selectors = new ArrayList<MessageSelector>();
|
||||
@@ -76,51 +76,33 @@ public class WireTapTests {
|
||||
mainChannel.send(new StringMessage("testing"));
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
assertNotNull(original);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
assertNotNull(duplicate);
|
||||
Message<?> intercepted = secondaryChannel.receive(0);
|
||||
assertNotNull(intercepted);
|
||||
assertEquals(original, intercepted);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void wireTapTargetMustNotBeNull() {
|
||||
new WireTap(null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewMessageIdGeneratedForDuplicate() {
|
||||
public void simpleTargetWireTap() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryChannel));
|
||||
mainChannel.send(new StringMessage("testing"));
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
Object duplicateId = duplicate.getHeaders().getId();
|
||||
assertNotNull(duplicateId);
|
||||
assertFalse("message ids should not match", original.getHeaders().getId().equals(duplicateId));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOriginalIdStoredAsAttribute() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryChannel));
|
||||
mainChannel.send(new StringMessage("testing"));
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
Object originalIdAttribute = duplicate.getHeaders().get(WireTap.ORIGINAL_MESSAGE_ID_KEY);
|
||||
assertNotNull(originalIdAttribute);
|
||||
assertEquals(original.getHeaders().getId(), originalIdAttribute);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNewTimestampGeneratedForDuplicate() throws InterruptedException {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryChannel));
|
||||
TestTarget secondaryTarget = new TestTarget();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryTarget));
|
||||
assertNull(secondaryTarget.getLastMessage());
|
||||
Message<?> message = new StringMessage("testing");
|
||||
Thread.sleep(50);
|
||||
mainChannel.send(message);
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
assertTrue("original timestamp should precede duplicate",
|
||||
original.getHeaders().getTimestamp() < duplicate.getHeaders().getTimestamp());
|
||||
Message<?> intercepted = secondaryTarget.getLastMessage();
|
||||
assertNotNull(original);
|
||||
assertNotNull(intercepted);
|
||||
assertEquals(original, intercepted);
|
||||
}
|
||||
|
||||
public void testDuplicateMessageContainsHeaderValue() {
|
||||
@Test
|
||||
public void interceptedMessageContainsHeaderValue() {
|
||||
QueueChannel mainChannel = new QueueChannel();
|
||||
QueueChannel secondaryChannel = new QueueChannel();
|
||||
mainChannel.addInterceptor(new WireTap(secondaryChannel));
|
||||
@@ -129,12 +111,12 @@ public class WireTapTests {
|
||||
.setHeader(headerName, new Integer(123)).build();
|
||||
mainChannel.send(message);
|
||||
Message<?> original = mainChannel.receive(0);
|
||||
Message<?> duplicate = secondaryChannel.receive(0);
|
||||
Message<?> intercepted = secondaryChannel.receive(0);
|
||||
Object originalAttribute = original.getHeaders().get(headerName);
|
||||
Object duplicateAttribute = duplicate.getHeaders().get(headerName);
|
||||
Object interceptedAttribute = intercepted.getHeaders().get(headerName);
|
||||
assertNotNull(originalAttribute);
|
||||
assertNotNull(duplicateAttribute);
|
||||
assertEquals(originalAttribute, duplicateAttribute);
|
||||
assertNotNull(interceptedAttribute);
|
||||
assertEquals(originalAttribute, interceptedAttribute);
|
||||
}
|
||||
|
||||
|
||||
@@ -151,4 +133,18 @@ public class WireTapTests {
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestTarget implements MessageTarget {
|
||||
|
||||
private volatile Message<?> lastMessage;
|
||||
|
||||
public boolean send(Message<?> message) {
|
||||
this.lastMessage = message;
|
||||
return true;
|
||||
}
|
||||
|
||||
public Message<?> getLastMessage() {
|
||||
return this.lastMessage;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user