SimpleDispatcher no longer continues to retry sending any time it catches an Exception. Instead, it will try to send to each of its targets until one of them accepts the message (returns true). If a target returns false (e.g. for a timeout) or throws a MessageRejectedException (e.g. a MessageSelector did not accept the Message), then it will continue trying its other targets. However, any other Exceptions will be re-thrown immediately. Also, it no longer attempts to send to its targets more than once. Thus, the 'rejectionLimit' and 'retryInterval' properties have been removed. The retry behavior actually belongs in an interceptor/template on a per-MessageTarget basis. Some targets are retry-able (e.g. might throw a RemoteAccessException), but others are not. Therefore, the SimpleDispatcher should not have this awareness; such configuration options belong with the individual target instead (part of INT-337).

This commit is contained in:
Mark Fisher
2008-08-18 19:36:04 +00:00
parent b4701c0150
commit 0ec6cd61d9
2 changed files with 60 additions and 162 deletions

View File

@@ -30,7 +30,6 @@ import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
@@ -69,7 +68,7 @@ public class SimpleDispatcherTests {
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target = new CountingFalseReturningTestTarget(counter);
MessageTarget target = new CountingTestTarget(counter, false);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test"));
@@ -80,9 +79,9 @@ public class SimpleDispatcherTests {
public void unsubscribeBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingFalseReturningTestTarget(counter);
MessageTarget target2 = new CountingFalseReturningTestTarget(counter);
MessageTarget target3 = new CountingFalseReturningTestTarget(counter);
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -95,9 +94,9 @@ public class SimpleDispatcherTests {
public void unsubscribeBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingFalseReturningTestTarget(counter);
MessageTarget target2 = new CountingFalseReturningTestTarget(counter);
MessageTarget target3 = new CountingFalseReturningTestTarget(counter);
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -117,8 +116,6 @@ public class SimpleDispatcherTests {
@Test
public void handlersWithSelectorsAndOneAccepts() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
@@ -144,8 +141,6 @@ public class SimpleDispatcherTests {
@Test
public void handlersWithSelectorsAndNoneAccept() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
final CountDownLatch latch = new CountDownLatch(2);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
@@ -175,25 +170,45 @@ public class SimpleDispatcherTests {
}
@Test
public void handlersThrowingExceptionUntilRetried() throws InterruptedException {
public void firstHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
final AtomicInteger handlerCounter = new AtomicInteger();
TestMessageHandler handler1 = new TestMessageHandler(handlerCounter, 4);
TestMessageHandler handler2 = new TestMessageHandler(handlerCounter, 4);
TestMessageHandler handler3 = new TestMessageHandler(handlerCounter, 2);
DefaultEndpoint<?> endpoint1 = new DefaultEndpoint<MessageHandler>(handler1);
DefaultEndpoint<?> endpoint2 = new DefaultEndpoint<MessageHandler>(handler2);
DefaultEndpoint<?> endpoint3 = new DefaultEndpoint<MessageHandler>(handler3);
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
dispatcher.send(new StringMessage("test"));
assertEquals("handlers should have been invoked 9 times in total", 9, handlerCounter.get());
assertFalse("first handler should not have handled the message", handler1.handledMessage);
assertFalse("second handler should not have handled the message", handler2.handledMessage);
assertTrue("third handler should have handled the message", handler3.handledMessage);
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, true);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
assertTrue(dispatcher.send(new StringMessage("test")));
assertEquals("only the first target should have been invoked", 1, counter.get());
}
@Test
public void middleHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, true);
MessageTarget target3 = new CountingTestTarget(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
assertTrue(dispatcher.send(new StringMessage("test")));
assertEquals("first two targets should have been invoked", 2, counter.get());
}
@Test
public void allHandlersReturnFalse() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingTestTarget(counter, false);
MessageTarget target2 = new CountingTestTarget(counter, false);
MessageTarget target3 = new CountingTestTarget(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
assertFalse(dispatcher.send(new StringMessage("test")));
assertEquals("each target should have been invoked", 3, counter.get());
}
@@ -220,53 +235,20 @@ public class SimpleDispatcherTests {
}
private static class TestMessageHandler implements MessageHandler {
private final AtomicInteger internalCounter = new AtomicInteger();
private final AtomicInteger sharedCounter;
private final int timesToFail;
private volatile boolean handledMessage = false;
TestMessageHandler(AtomicInteger sharedCounter, int timesToReject) {
this.sharedCounter = sharedCounter;
this.timesToFail = timesToReject;
}
public Message<?> handle(Message<?> message) {
int count = internalCounter.incrementAndGet();
this.sharedCounter.incrementAndGet();
if (this.timesToFail == 0) {
this.handledMessage = true;
return null;
}
if (this.timesToFail < 0) {
throw new MessageHandlingException(message, "intentional test failure");
}
if (count > timesToFail) {
this.handledMessage = true;
return null;
}
else {
throw new MessageHandlingException(message, "intentional test failure");
}
}
}
private static class CountingFalseReturningTestTarget implements MessageTarget {
private static class CountingTestTarget implements MessageTarget {
private final AtomicInteger counter;
CountingFalseReturningTestTarget(AtomicInteger counter) {
private final boolean returnValue;
CountingTestTarget(AtomicInteger counter, boolean returnValue) {
this.counter = counter;
this.returnValue = returnValue;
}
public boolean send(Message<?> message) {
this.counter.incrementAndGet();
return false;
return this.returnValue;
}
}