IN PROGRESS - issue INT-567: Add round-robin dispatching strategy

http://jira.springframework.org/browse/INT-567

Simplified code and added concurrent tests
This commit is contained in:
Iwein Fuld
2009-03-09 19:14:21 +00:00
parent 242d8291ec
commit 01fe7a0baf
8 changed files with 223 additions and 90 deletions

View File

@@ -0,0 +1,111 @@
package org.springframework.integration.dispatcher;
import static org.junit.Assert.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnit44Runner;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@RunWith(MockitoJUnit44Runner.class)
public class LoadBalancingDispatcherConcurrentTests {
private static final int TOTAL_EXECUTIONS = 40;
private AbstractWinningHandlerDispatcher dispatcher = new LoadBalancingDispatcher();
private ThreadPoolTaskExecutor scheduler = new ThreadPoolTaskExecutor();
@Mock
private MessageHandler handler1;
@Mock
private MessageHandler handler2;
@Mock
private MessageHandler handler3;
@Mock
private MessageHandler handler4;
@Mock
private Message<?> message;
@Before
public void initialize() throws Exception {
scheduler.setCorePoolSize(10);
scheduler.setMaxPoolSize(10);
scheduler.initialize();
}
@Test(timeout = 1000)
public void noHandlerExhaustion() throws Exception {
dispatcher.addHandler(handler1);
dispatcher.addHandler(handler2);
dispatcher.addHandler(handler3);
dispatcher.addHandler(handler4);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(TOTAL_EXECUTIONS);
final Message<?> message = this.message;
final AtomicBoolean failed = new AtomicBoolean(false);
Runnable messageSenderTask = new Runnable() {
public void run() {
try {
start.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (!dispatcher.dispatch(message)) {
failed.set(true);
}
allDone.countDown();
}
};
for (int i = 0; i < TOTAL_EXECUTIONS; i++) {
scheduler.execute(messageSenderTask);
}
start.countDown();
allDone.await();
assertFalse("not all messages were accepted", failed.get());
}
@Test(timeout=2000)
public void unlockOnFailure() throws Exception {
// dispatcher has no subscribers (shouldn't lead to deadlock)
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(TOTAL_EXECUTIONS);
final Message<?> message = this.message;
Runnable messageSenderTask = new Runnable() {
public void run() {
try {
start.await();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
try {
dispatcher.dispatch(message);
fail("this shouldn't happen");
}
catch (MessageDeliveryException e) {
//expected
}
allDone.countDown();
}
};
for (int i = 0; i < TOTAL_EXECUTIONS; i++) {
scheduler.execute(messageSenderTask);
}
start.countDown();
allDone.await();
}
}

View File

@@ -17,10 +17,13 @@ package org.springframework.integration.dispatcher;
import static org.mockito.Mockito.*;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnit44Runner;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.MessageHandler;
@@ -32,7 +35,7 @@ import org.springframework.integration.message.MessageHandler;
@RunWith(MockitoJUnit44Runner.class)
public class LoadBalancingDispatcherTests {
private LoadBalancingDispatcher dispatcher = new LoadBalancingDispatcher();
private AbstractWinningHandlerDispatcher dispatcher = new LoadBalancingDispatcher();
@Mock
private MessageHandler handler;
@@ -58,4 +61,16 @@ public class LoadBalancingDispatcherTests {
verify(handler).handleMessage(message);
verify(differentHandler).handleMessage(message);
}
@Test
public void overFlowCurrentHandlerIndex() throws Exception {
dispatcher.addHandler(handler);
dispatcher.addHandler(differentHandler);
DirectFieldAccessor accessor = new DirectFieldAccessor(dispatcher);
((AtomicInteger)accessor.getPropertyValue("currentHandlerIndex")).set(Integer.MAX_VALUE-5);
for(long i=0; i < 40; i++){
dispatcher.dispatch(message);
}
verify(handler, atLeast(18)).handleMessage(message);
verify(differentHandler, atLeast(18)).handleMessage(message);
}
}