AbstractDispatcher now uses a Set instead of List (INT-335).

This commit is contained in:
Mark Fisher
2008-08-14 12:58:21 +00:00
parent 3e15df93ed
commit 1cc799b438
3 changed files with 247 additions and 55 deletions

View File

@@ -29,6 +29,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -54,9 +55,14 @@ public class BroadcastingDispatcherTests {
private Message<?> messageMock = createMock(Message.class);
private MessageTarget targetMock = createMock(MessageTarget.class);
private MessageTarget targetMock1 = createMock(MessageTarget.class);
private Object[] globalMocks = new Object[] { messageMock, taskExecutorMock, targetMock };
private MessageTarget targetMock2 = createMock(MessageTarget.class);
private MessageTarget targetMock3 = createMock(MessageTarget.class);
private Object[] globalMocks = new Object[] {
messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 };
@Before
@@ -69,60 +75,168 @@ public class BroadcastingDispatcherTests {
@Test
public void publishSubcribe() throws Exception {
public void singleTargetWithoutTaskExecutor() throws Exception {
dispatcher.setTaskExecutor(null);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
expect(targetMock.send(messageMock)).andReturn(true).times(2);
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsWithExecutor() {
// should the same target be allowed to be added twice?
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
expect(targetMock.send(messageMock)).andReturn(true).times(3);
public void singleTargetWithTaskExecutor() throws Exception {
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsPartialFailure() {
public void multipleTargetsWithoutTaskExecutor() {
dispatcher.setTaskExecutor(null);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsWithTaskExecutor() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsPartialFailureFirst() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
partialFailingExecutorMock(true, false, true);
expect(targetMock.send(messageMock)).andReturn(true).times(2);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(false, true, true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsPartialFailureMiddle() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(true, false, true);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsPartialFailureLast() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(true, true, false);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void multipleTargetsAllFail() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(false, false, false);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void noDuplicateSubscription() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void unsubscribeBeforeSend() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.unsubscribe(targetMock2);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
replay(globalMocks);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test
public void unsubscribeBetweenSends() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true).times(2);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true).times(2);
replay(globalMocks);
dispatcher.send(messageMock);
dispatcher.unsubscribe(targetMock2);
dispatcher.send(messageMock);
verify(globalMocks);
}
@Test(timeout = 500)
public void multipleTargetsPartialTimeout() throws Exception {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.setTimeout(50);
// three threads invoking targets
final CountDownLatch latch = new CountDownLatch(3);
threadedExecutorMock(3);
final AtomicBoolean timingOutStarted = new AtomicBoolean(false);
final AtomicBoolean testNotTimedOut = new AtomicBoolean(false);
expect(targetMock.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
expect(targetMock1.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
latch.countDown();
return true;
}
}).times(2);
});
expect(targetMock2.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
latch.countDown();
return true;
}
});
/*
* Watch out, this is tricky. The send() method will be invoked but due
* to the faked time out it will never return. Therefore the expectation
@@ -130,7 +244,7 @@ public class BroadcastingDispatcherTests {
* This is something that EasyMock doesn't support so I've worked around
* it with an AtomicBoolean and a latch. It isn't pretty, but it sort of works
*/
expect(targetMock.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
expect(targetMock3.send(messageMock)).andAnswer(new IAnswer<Boolean>() {
public Boolean answer() throws Throwable {
// this should happen
timingOutStarted.compareAndSet(false, true);
@@ -153,15 +267,11 @@ public class BroadcastingDispatcherTests {
@Test
public void applySequenceDisabledByDefault() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
final List<Message<?>> messages = new ArrayList<Message<?>>();
MessageTarget target = new MessageTarget() {
public boolean send(Message<?> message) {
messages.add(message);
return true;
}
};
dispatcher.subscribe(target);
dispatcher.subscribe(target);
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageTarget target1 = new MessageStoringTestTarget(messages);
MessageTarget target2 = new MessageStoringTestTarget(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.send(new StringMessage("test"));
assertEquals(2, messages.size());
assertEquals(0, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -174,16 +284,13 @@ public class BroadcastingDispatcherTests {
public void applySequenceEnabled() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
dispatcher.setApplySequence(true);
final List<Message<?>> messages = new ArrayList<Message<?>>();
MessageTarget target = new MessageTarget() {
public boolean send(Message<?> message) {
messages.add(message);
return true;
}
};
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageTarget target1 = new MessageStoringTestTarget(messages);
MessageTarget target2 = new MessageStoringTestTarget(messages);
MessageTarget target3 = new MessageStoringTestTarget(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.send(new StringMessage("test"));
assertEquals(3, messages.size());
assertEquals(1, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -210,7 +317,7 @@ public class BroadcastingDispatcherTests {
*/
private void partialFailingExecutorMock(boolean... passes) {
taskExecutorMock.execute(isA(Runnable.class));
for (final boolean pass : passes)
for (final boolean pass : passes) {
expectLastCall().andAnswer(new IAnswer<Object>() {
public Object answer() throws Throwable {
if (pass) {
@@ -219,6 +326,7 @@ public class BroadcastingDispatcherTests {
return null;
}
});
}
}
/*
@@ -235,4 +343,20 @@ public class BroadcastingDispatcherTests {
}
}).times(count);
}
}
private static class MessageStoringTestTarget implements MessageTarget {
private final List<Message<?>> messageList;
MessageStoringTestTarget(List<Message<?>> messageList) {
this.messageList = messageList;
}
public boolean send(Message<?> message) {
this.messageList.add(message);
return true;
}
};
}

View File

@@ -32,6 +32,7 @@ import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
@@ -41,7 +42,7 @@ import org.springframework.integration.message.selector.MessageSelector;
public class SimpleDispatcherTests {
@Test
public void testSingleMessage() throws InterruptedException {
public void singleMessage() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch)));
@@ -51,7 +52,7 @@ public class SimpleDispatcherTests {
}
@Test
public void testPointToPoint() throws InterruptedException {
public void pointToPoint() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
@@ -65,7 +66,56 @@ public class SimpleDispatcherTests {
}
@Test
public void testHandlersWithSelectorsAndOneAccepts() throws InterruptedException {
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target = new CountingFalseReturningTestTarget(counter);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test"));
assertEquals("target should not have duplicate subscriptions", 1, counter.get());
}
@Test
public void unsubscribeBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingFalseReturningTestTarget(counter);
MessageTarget target2 = new CountingFalseReturningTestTarget(counter);
MessageTarget target3 = new CountingFalseReturningTestTarget(counter);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.unsubscribe(target2);
dispatcher.send(new StringMessage("test"));
assertEquals(2, counter.get());
}
@Test
public void unsubscribeBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageTarget target1 = new CountingFalseReturningTestTarget(counter);
MessageTarget target2 = new CountingFalseReturningTestTarget(counter);
MessageTarget target3 = new CountingFalseReturningTestTarget(counter);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.send(new StringMessage("test1"));
assertEquals(3, counter.get());
dispatcher.unsubscribe(target2);
dispatcher.send(new StringMessage("test2"));
assertEquals(5, counter.get());
dispatcher.unsubscribe(target1);
dispatcher.send(new StringMessage("test3"));
assertEquals(6, counter.get());
dispatcher.unsubscribe(target3);
dispatcher.send(new StringMessage("test4"));
assertEquals(6, counter.get());
}
@Test
public void handlersWithSelectorsAndOneAccepts() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
@@ -91,8 +141,8 @@ public class SimpleDispatcherTests {
assertEquals("handler with accepting selector should have received the message", 1, counter3.get());
}
@Test()
public void testHandlersWithSelectorsAndNoneAccept() throws InterruptedException {
@Test
public void handlersWithSelectorsAndNoneAccept() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
@@ -125,7 +175,7 @@ public class SimpleDispatcherTests {
}
@Test
public void testHandlersThrowingExceptionUntilRetried() throws InterruptedException {
public void handlersThrowingExceptionUntilRetried() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
dispatcher.setRejectionLimit(5);
dispatcher.setRetryInterval(5);
@@ -205,4 +255,19 @@ public class SimpleDispatcherTests {
}
}
private static class CountingFalseReturningTestTarget implements MessageTarget {
private final AtomicInteger counter;
CountingFalseReturningTestTarget(AtomicInteger counter) {
this.counter = counter;
}
public boolean send(Message<?> message) {
this.counter.incrementAndGet();
return false;
}
}
}