Added more tests.
This commit is contained in:
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.channel.ChannelRegistry;
|
||||
import org.springframework.integration.channel.DefaultChannelRegistry;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
@@ -35,7 +36,9 @@ import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.handler.TestHandlers;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageBuilder;
|
||||
import org.springframework.integration.message.MessageHandlingException;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.MessageTarget;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.message.selector.MessageSelector;
|
||||
import org.springframework.integration.message.selector.MessageSelectorChain;
|
||||
@@ -46,80 +49,105 @@ import org.springframework.integration.message.selector.MessageSelectorChain;
|
||||
public class HandlerEndpointTests {
|
||||
|
||||
@Test
|
||||
public void testOutputChannel() throws Exception {
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
|
||||
channelRegistry.registerChannel("replyChannel", replyChannel);
|
||||
MessageHandler handler = new MessageHandler() {
|
||||
public Message<String> handle(Message<?> message) {
|
||||
return new StringMessage("hello " + message.getPayload());
|
||||
}
|
||||
};
|
||||
public void outputChannel() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setOutputChannel(channel);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnAddressHeader() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nextTargetHeaderTakesPrecedence() {
|
||||
QueueChannel channel1 = new QueueChannel(1);
|
||||
QueueChannel channel2 = new QueueChannel(1);
|
||||
QueueChannel channel3 = new QueueChannel(1);
|
||||
MessageHandler handler = new TestNextTargetSettingHandler(channel1);
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setOutputChannel(channel2);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel3).build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply1 = channel1.receive(0);
|
||||
assertNotNull(reply1);
|
||||
assertEquals("foo", reply1.getPayload());
|
||||
Message<?> reply2 = channel2.receive(0);
|
||||
assertNull(reply2);
|
||||
Message<?> reply3 = channel3.receive(0);
|
||||
assertNull(reply3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void returnAddressHeaderWithChannelName() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
ChannelRegistry channelRegistry = new DefaultMessageBus();
|
||||
channelRegistry.registerChannel("testChannel", channel);
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
endpoint.setTarget(replyChannel);
|
||||
endpoint.send(new StringMessage("test"));
|
||||
Message<?> reply = replyChannel.receive(50);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress("testChannel").build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("hello test", reply.getPayload());
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplicitReplyChannel() throws Exception {
|
||||
final QueueChannel replyChannel = new QueueChannel();
|
||||
MessageHandler handler = new MessageHandler() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage("hello " + message.getPayload());
|
||||
}
|
||||
};
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
Message<String> testMessage = MessageBuilder.fromPayload("test")
|
||||
.setReturnAddress(replyChannel).build();
|
||||
endpoint.send(testMessage);
|
||||
Message<?> reply = replyChannel.receive(50);
|
||||
assertNotNull(reply);
|
||||
assertEquals("hello test", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplicitReplyChannelName() throws Exception {
|
||||
final QueueChannel replyChannel = new QueueChannel();
|
||||
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
|
||||
channelRegistry.registerChannel("replyChannel", replyChannel);
|
||||
MessageHandler handler = new MessageHandler() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage("hello " + message.getPayload());
|
||||
}
|
||||
};
|
||||
public void nextTargetHeaderWithChannelName() {
|
||||
QueueChannel channel1 = new QueueChannel(1);
|
||||
QueueChannel channel2 = new QueueChannel(1);
|
||||
QueueChannel channel3 = new QueueChannel(1);
|
||||
ChannelRegistry channelRegistry = new DefaultMessageBus();
|
||||
channelRegistry.registerChannel("testChannel", channel1);
|
||||
MessageHandler handler = new TestNextTargetSettingHandler("testChannel");
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
Message<String> testMessage = MessageBuilder.fromPayload("test")
|
||||
.setReturnAddress(replyChannel).build();
|
||||
endpoint.send(testMessage);
|
||||
Message<?> reply = replyChannel.receive(50);
|
||||
assertNotNull(reply);
|
||||
assertEquals("hello test", reply.getPayload());
|
||||
endpoint.setOutputChannel(channel2);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel3).build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply1 = channel1.receive(0);
|
||||
assertNotNull(reply1);
|
||||
assertEquals("foo", reply1.getPayload());
|
||||
Message<?> reply2 = channel2.receive(0);
|
||||
assertNull(reply2);
|
||||
Message<?> reply3 = channel3.receive(0);
|
||||
assertNull(reply3);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicReplyChannel() throws Exception {
|
||||
public void dynamicReplyChannel() throws Exception {
|
||||
final QueueChannel replyChannel1 = new QueueChannel();
|
||||
final QueueChannel replyChannel2 = new QueueChannel();
|
||||
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
|
||||
channelRegistry.registerChannel("replyChannel2", replyChannel2);
|
||||
MessageHandler handler = new MessageHandler() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage("hello " + message.getPayload());
|
||||
return new StringMessage("foo" + message.getPayload());
|
||||
}
|
||||
};
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
Message<String> testMessage1 = MessageBuilder.fromPayload("test")
|
||||
Message<String> testMessage1 = MessageBuilder.fromPayload("bar")
|
||||
.setReturnAddress(replyChannel1).build();
|
||||
endpoint.send(testMessage1);
|
||||
Message<?> reply1 = replyChannel1.receive(50);
|
||||
assertNotNull(reply1);
|
||||
assertEquals("hello test", reply1.getPayload());
|
||||
assertEquals("foobar", reply1.getPayload());
|
||||
Message<?> reply2 = replyChannel2.receive(0);
|
||||
assertNull(reply2);
|
||||
Message<String> testMessage2 = MessageBuilder.fromMessage(testMessage1)
|
||||
@@ -129,33 +157,66 @@ public class HandlerEndpointTests {
|
||||
assertNull(reply1);
|
||||
reply2 = replyChannel2.receive(0);
|
||||
assertNotNull(reply2);
|
||||
assertEquals("hello test", reply2.getPayload());
|
||||
assertEquals("foobar", reply2.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandlerReturnsNull() throws InterruptedException {
|
||||
QueueChannel replyChannel = new QueueChannel();
|
||||
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
|
||||
channelRegistry.registerChannel("replyChannel", replyChannel);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
MessageHandler handler = new MessageHandler() {
|
||||
public Message<String> handle(Message<?> message) {
|
||||
latch.countDown();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
public void noOutputChannelFallsBackToReturnAddress() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setChannelRegistry(channelRegistry);
|
||||
endpoint.setOutputChannelName("replyChannel");
|
||||
endpoint.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals("handler should have been invoked within allotted time", 0, latch.getCount());
|
||||
Message<?> reply = replyChannel.receive(0);
|
||||
assertNull(reply);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void unknownNextTargetChannelFallsBackToOutputChannel() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setOutputChannel(channel);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").setNextTarget("unknown").build();
|
||||
endpoint.send(message);
|
||||
Message<?> reply = channel.receive(0);
|
||||
assertNotNull(reply);
|
||||
assertEquals("FOO", reply.getPayload());
|
||||
}
|
||||
|
||||
@Test(expected = MessageEndpointReplyException.class)
|
||||
public void noReplyTarget() {
|
||||
MessageHandler handler = new TestHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").build();
|
||||
endpoint.send(message);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noReplyMessage() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestNullReplyHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setOutputChannel(channel);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").build();
|
||||
endpoint.send(message);
|
||||
assertNull(channel.receive(0));
|
||||
}
|
||||
|
||||
@Test(expected = MessageHandlingException.class)
|
||||
public void noReplyMessageWithRequiresReply() {
|
||||
QueueChannel channel = new QueueChannel(1);
|
||||
MessageHandler handler = new TestNullReplyHandler();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(handler);
|
||||
endpoint.setRequiresReply(true);
|
||||
endpoint.setOutputChannel(channel);
|
||||
Message<?> message = MessageBuilder.fromPayload("foo").build();
|
||||
endpoint.send(message);
|
||||
}
|
||||
|
||||
@Test(expected=MessageRejectedException.class)
|
||||
public void testEndpointWithSelectorRejecting() {
|
||||
public void endpointWithSelectorRejecting() {
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(TestHandlers.nullHandler());
|
||||
endpoint.setSelector(new MessageSelector() {
|
||||
public boolean accept(Message<?> message) {
|
||||
@@ -166,7 +227,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithSelectorAccepting() throws InterruptedException {
|
||||
public void endpointWithSelectorAccepting() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(TestHandlers.countDownHandler(latch));
|
||||
endpoint.setSelector(new MessageSelector() {
|
||||
@@ -180,7 +241,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithMultipleSelectorsAndFirstRejects() {
|
||||
public void endpointWithMultipleSelectorsAndFirstRejects() {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(TestHandlers.countingHandler(counter));
|
||||
MessageSelectorChain selectorChain = new MessageSelectorChain();
|
||||
@@ -209,7 +270,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithMultipleSelectorsAndFirstAccepts() {
|
||||
public void endpointWithMultipleSelectorsAndFirstAccepts() {
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
AtomicInteger handlerCounter = new AtomicInteger();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(TestHandlers.countingHandler(handlerCounter));
|
||||
@@ -240,7 +301,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEndpointWithMultipleSelectorsAndBothAccept() {
|
||||
public void endpointWithMultipleSelectorsAndBothAccept() {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(TestHandlers.countingHandler(counter));
|
||||
MessageSelectorChain selectorChain = new MessageSelectorChain();
|
||||
@@ -262,7 +323,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorrelationId() {
|
||||
public void correlationId() {
|
||||
QueueChannel replyChannel = new QueueChannel(1);
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(new MessageHandler() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
@@ -277,7 +338,7 @@ public class HandlerEndpointTests {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCorrelationIdSetByHandlerTakesPrecedence() {
|
||||
public void correlationIdSetByHandlerTakesPrecedence() {
|
||||
QueueChannel replyChannel = new QueueChannel(1);
|
||||
SimpleEndpoint<MessageHandler> endpoint = new SimpleEndpoint<MessageHandler>(new MessageHandler() {
|
||||
public Message<?> handle(Message<?> message) {
|
||||
@@ -294,4 +355,39 @@ public class HandlerEndpointTests {
|
||||
assertEquals("ABC-123", correlationId);
|
||||
}
|
||||
|
||||
|
||||
private static class TestHandler implements MessageHandler {
|
||||
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return new StringMessage(message.getPayload().toString().toUpperCase());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestNextTargetSettingHandler implements MessageHandler {
|
||||
|
||||
private final Object nextTarget;
|
||||
|
||||
TestNextTargetSettingHandler(Object nextTarget) {
|
||||
this.nextTarget = nextTarget;
|
||||
}
|
||||
|
||||
public Message<?> handle(Message<?> message) {
|
||||
if (nextTarget instanceof MessageTarget) {
|
||||
return MessageBuilder.fromPayload(message.getPayload())
|
||||
.setNextTarget((MessageTarget) nextTarget).build();
|
||||
}
|
||||
return MessageBuilder.fromPayload(message.getPayload())
|
||||
.setNextTarget((String) nextTarget).build();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestNullReplyHandler implements MessageHandler {
|
||||
|
||||
public Message<?> handle(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user