OPEN - issue INT-567: Add round-robin dispatching strategy

http://jira.springframework.org/browse/INT-567

Fixed issue with concurrent failures and removed triedHandlers
This commit is contained in:
Iwein Fuld
2009-03-10 08:11:58 +00:00
parent 8f122cf58c
commit efea914ed1
7 changed files with 118 additions and 41 deletions

View File

@@ -1,6 +1,7 @@
package org.springframework.integration.dispatcher;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -13,6 +14,7 @@ import org.mockito.runners.MockitoJUnit44Runner;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@RunWith(MockitoJUnit44Runner.class)
@@ -20,7 +22,7 @@ public class RoundRobinDispatcherConcurrentTests {
private static final int TOTAL_EXECUTIONS = 40;
private AbstractWinningHandlerDispatcher dispatcher = new RoundRobinDispatcher();
private AbstractHandleOnceDispatcher dispatcher = new RoundRobinDispatcher();
private ThreadPoolTaskExecutor scheduler = new ThreadPoolTaskExecutor();
@@ -76,9 +78,13 @@ public class RoundRobinDispatcherConcurrentTests {
start.countDown();
allDone.await();
assertFalse("not all messages were accepted", failed.get());
verify(handler1, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
verify(handler2, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
verify(handler3, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
verify(handler4, times(TOTAL_EXECUTIONS / 4)).handleMessage(message);
}
@Test(timeout=2000)
@Test(timeout = 2000)
public void unlockOnFailure() throws Exception {
// dispatcher has no subscribers (shouldn't lead to deadlock)
final CountDownLatch start = new CountDownLatch(1);
@@ -97,7 +103,7 @@ public class RoundRobinDispatcherConcurrentTests {
fail("this shouldn't happen");
}
catch (MessageDeliveryException e) {
//expected
// expected
}
allDone.countDown();
}
@@ -108,4 +114,39 @@ public class RoundRobinDispatcherConcurrentTests {
start.countDown();
allDone.await();
}
@Test
public void noHandlerSkipUnderConcurrentFailure() throws Exception {
dispatcher.addHandler(handler1);
dispatcher.addHandler(handler2);
doThrow(new MessageRejectedException(message)).when(handler1).handleMessage(message);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(TOTAL_EXECUTIONS);
final Message<?> message = this.message;
final AtomicBoolean failed = new AtomicBoolean(false);
Runnable messageSenderTask = new Runnable() {
public void run() {
try {
start.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!dispatcher.dispatch(message)) {
failed.set(true);
}
else {
allDone.countDown();
}
}
};
for (int i = 0; i < TOTAL_EXECUTIONS; i++) {
scheduler.execute(messageSenderTask);
}
start.countDown();
allDone.await();
assertFalse("not all messages were accepted", failed.get());
verify(handler1, times(TOTAL_EXECUTIONS/2)).handleMessage(message);
verify(handler2, times(TOTAL_EXECUTIONS)).handleMessage(message);
}
}

View File

@@ -35,7 +35,7 @@ import org.springframework.integration.message.MessageHandler;
@RunWith(MockitoJUnit44Runner.class)
public class RoundRobinDispatcherTests {
private AbstractWinningHandlerDispatcher dispatcher = new RoundRobinDispatcher();
private AbstractHandleOnceDispatcher dispatcher = new RoundRobinDispatcher();
@Mock
private MessageHandler handler;