Refactored SimpleDispatcher and added test cases in SimpleDispatcherTests. Also removed MessageHandlerNotRunningException.
This commit is contained in:
@@ -17,18 +17,22 @@
|
||||
package org.springframework.integration.dispatcher;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.endpoint.HandlerEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.handler.TestHandlers;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.MessageHandlingException;
|
||||
import org.springframework.integration.message.MessageRejectedException;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
import org.springframework.integration.message.MessageTarget;
|
||||
import org.springframework.integration.message.selector.MessageSelector;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
@@ -59,9 +63,145 @@ public class SimpleDispatcherTests {
|
||||
assertEquals("only 1 handler should have received the message", 1, counter1.get() + counter2.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandlersWithSelectorsAndOneAccepts() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
dispatcher.setRejectionLimit(5);
|
||||
dispatcher.setRetryInterval(5);
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
final AtomicInteger counter3 = new AtomicInteger();
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
HandlerEndpoint endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
|
||||
HandlerEndpoint endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
|
||||
HandlerEndpoint endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
|
||||
endpoint1.setMessageSelector(new TestMessageSelector(selectorCounter, false));
|
||||
endpoint2.setMessageSelector(new TestMessageSelector(selectorCounter, false));
|
||||
endpoint3.setMessageSelector(new TestMessageSelector(selectorCounter, true));
|
||||
dispatcher.addTarget(endpoint1);
|
||||
dispatcher.addTarget(endpoint2);
|
||||
dispatcher.addTarget(endpoint3);
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
|
||||
assertEquals("handler with rejecting selector should not have received the message", 0, counter1.get());
|
||||
assertEquals("handler with rejecting selector should not have received the message", 0, counter2.get());
|
||||
assertEquals("handler with accepting selector should have received the message", 1, counter3.get());
|
||||
}
|
||||
|
||||
private static MessageTarget createEndpoint(MessageHandler handler) {
|
||||
@Test()
|
||||
public void testHandlersWithSelectorsAndNoneAccept() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
dispatcher.setRejectionLimit(5);
|
||||
dispatcher.setRetryInterval(5);
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
final AtomicInteger counter3 = new AtomicInteger();
|
||||
final AtomicInteger selectorCounter = new AtomicInteger();
|
||||
HandlerEndpoint endpoint1 = createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch));
|
||||
HandlerEndpoint endpoint2 = createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch));
|
||||
HandlerEndpoint endpoint3 = createEndpoint(TestHandlers.countingCountDownHandler(counter3, latch));
|
||||
endpoint1.setMessageSelector(new TestMessageSelector(selectorCounter, false));
|
||||
endpoint2.setMessageSelector(new TestMessageSelector(selectorCounter, false));
|
||||
endpoint3.setMessageSelector(new TestMessageSelector(selectorCounter, false));
|
||||
dispatcher.addTarget(endpoint1);
|
||||
dispatcher.addTarget(endpoint2);
|
||||
dispatcher.addTarget(endpoint3);
|
||||
boolean exceptionThrown = false;
|
||||
try {
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
}
|
||||
catch (MessageRejectedException e) {
|
||||
exceptionThrown = true;
|
||||
}
|
||||
assertTrue(exceptionThrown);
|
||||
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
|
||||
assertEquals("handler with rejecting selector should not have received the message", 0, counter1.get());
|
||||
assertEquals("handler with rejecting selector should not have received the message", 0, counter2.get());
|
||||
assertEquals("handler with rejecting selector should not have received the message", 0, counter3.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHandlersThrowingExceptionUntilRetried() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
dispatcher.setRejectionLimit(5);
|
||||
dispatcher.setRetryInterval(5);
|
||||
final AtomicInteger handlerCounter = new AtomicInteger();
|
||||
TestMessageHandler handler1 = new TestMessageHandler(handlerCounter, 4);
|
||||
TestMessageHandler handler2 = new TestMessageHandler(handlerCounter, 4);
|
||||
TestMessageHandler handler3 = new TestMessageHandler(handlerCounter, 2);
|
||||
HandlerEndpoint endpoint1 = new HandlerEndpoint(handler1);
|
||||
HandlerEndpoint endpoint2 = new HandlerEndpoint(handler2);
|
||||
HandlerEndpoint endpoint3 = new HandlerEndpoint(handler3);
|
||||
dispatcher.addTarget(endpoint1);
|
||||
dispatcher.addTarget(endpoint2);
|
||||
dispatcher.addTarget(endpoint3);
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
assertEquals("handlers should have been invoked 9 times in total", 9, handlerCounter.get());
|
||||
assertFalse("first handler should not have handled the message", handler1.handledMessage);
|
||||
assertFalse("second handler should not have handled the message", handler2.handledMessage);
|
||||
assertTrue("third handler should have handled the message", handler3.handledMessage);
|
||||
}
|
||||
|
||||
|
||||
private static HandlerEndpoint createEndpoint(MessageHandler handler) {
|
||||
return new HandlerEndpoint(handler);
|
||||
}
|
||||
|
||||
|
||||
private static class TestMessageSelector implements MessageSelector {
|
||||
|
||||
private final AtomicInteger counter;
|
||||
|
||||
private final boolean accept;
|
||||
|
||||
TestMessageSelector(AtomicInteger counter, boolean accept) {
|
||||
this.counter = counter;
|
||||
this.accept = accept;
|
||||
}
|
||||
|
||||
public boolean accept(Message<?> message) {
|
||||
this.counter.incrementAndGet();
|
||||
return this.accept;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static class TestMessageHandler implements MessageHandler {
|
||||
|
||||
private final AtomicInteger internalCounter = new AtomicInteger();
|
||||
|
||||
private final AtomicInteger sharedCounter;
|
||||
|
||||
private final int timesToFail;
|
||||
|
||||
private volatile boolean handledMessage = false;
|
||||
|
||||
TestMessageHandler(AtomicInteger sharedCounter, int timesToReject) {
|
||||
this.sharedCounter = sharedCounter;
|
||||
this.timesToFail = timesToReject;
|
||||
}
|
||||
|
||||
public Message<?> handle(Message<?> message) {
|
||||
int count = internalCounter.incrementAndGet();
|
||||
this.sharedCounter.incrementAndGet();
|
||||
if (this.timesToFail == 0) {
|
||||
this.handledMessage = true;
|
||||
return null;
|
||||
}
|
||||
if (this.timesToFail < 0) {
|
||||
throw new MessageHandlingException(message, "intentional test failure");
|
||||
}
|
||||
if (count > timesToFail) {
|
||||
this.handledMessage = true;
|
||||
return null;
|
||||
}
|
||||
else {
|
||||
throw new MessageHandlingException(message, "intentional test failure");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -56,8 +56,8 @@ public class MessageHeadersTests {
|
||||
assertEquals(value, headers.get("test", Integer.class));
|
||||
}
|
||||
|
||||
@Test(expected = MessagingException.class)
|
||||
public void testTypeMismatchWhenAccessingHeaderValue() {
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testHeaderValueAccessWithIncorrectType() {
|
||||
Integer value = new Integer(123);
|
||||
Map<String, Object> map = new HashMap<String, Object>();
|
||||
map.put("test", value);
|
||||
|
||||
Reference in New Issue
Block a user