MessageDispatcher now extends SubscribableSource. The 'addTarget' and 'removeTarget' methods have been replaced with 'subscribe' and 'unsubscribe' respectively.

This commit is contained in:
Mark Fisher
2008-08-14 11:53:24 +00:00
parent 234c87da1d
commit 3e15df93ed
8 changed files with 96 additions and 92 deletions

View File

@@ -71,8 +71,8 @@ public class BroadcastingDispatcherTests {
@Test
public void publishSubcribe() throws Exception {
dispatcher.setTaskExecutor(null);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
expect(targetMock.send(messageMock)).andReturn(true).times(2);
replay(globalMocks);
dispatcher.send(messageMock);
@@ -82,9 +82,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsWithExecutor() {
// should the same target be allowed to be added twice?
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
expect(targetMock.send(messageMock)).andReturn(true).times(3);
replay(globalMocks);
dispatcher.send(messageMock);
@@ -94,9 +94,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailure() {
reset(taskExecutorMock);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
partialFailingExecutorMock(true, false, true);
expect(targetMock.send(messageMock)).andReturn(true).times(2);
replay(globalMocks);
@@ -107,9 +107,9 @@ public class BroadcastingDispatcherTests {
@Test(timeout = 500)
public void multipleTargetsPartialTimeout() throws Exception {
reset(taskExecutorMock);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.addTarget(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.subscribe(targetMock);
dispatcher.setTimeout(50);
// three threads invoking targets
final CountDownLatch latch = new CountDownLatch(3);
@@ -160,8 +160,8 @@ public class BroadcastingDispatcherTests {
return true;
}
};
dispatcher.addTarget(target);
dispatcher.addTarget(target);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test"));
assertEquals(2, messages.size());
assertEquals(0, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -181,9 +181,9 @@ public class BroadcastingDispatcherTests {
return true;
}
};
dispatcher.addTarget(target);
dispatcher.addTarget(target);
dispatcher.addTarget(target);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.send(new StringMessage("test"));
assertEquals(3, messages.size());
assertEquals(1, (int) messages.get(0).getHeaders().getSequenceNumber());

View File

@@ -44,7 +44,7 @@ public class SimpleDispatcherTests {
public void testSingleMessage() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
dispatcher.addTarget(createEndpoint(TestHandlers.countDownHandler(latch)));
dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch)));
dispatcher.send(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -56,8 +56,8 @@ public class SimpleDispatcherTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.send(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -80,9 +80,9 @@ public class SimpleDispatcherTests {
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, true));
dispatcher.addTarget(endpoint1);
dispatcher.addTarget(endpoint2);
dispatcher.addTarget(endpoint3);
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
dispatcher.send(new StringMessage("test"));
assertEquals(0, latch.getCount());
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
@@ -107,9 +107,9 @@ public class SimpleDispatcherTests {
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, false));
dispatcher.addTarget(endpoint1);
dispatcher.addTarget(endpoint2);
dispatcher.addTarget(endpoint3);
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
boolean exceptionThrown = false;
try {
dispatcher.send(new StringMessage("test"));
@@ -136,9 +136,9 @@ public class SimpleDispatcherTests {
DefaultEndpoint<?> endpoint1 = new DefaultEndpoint<MessageHandler>(handler1);
DefaultEndpoint<?> endpoint2 = new DefaultEndpoint<MessageHandler>(handler2);
DefaultEndpoint<?> endpoint3 = new DefaultEndpoint<MessageHandler>(handler3);
dispatcher.addTarget(endpoint1);
dispatcher.addTarget(endpoint2);
dispatcher.addTarget(endpoint3);
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
dispatcher.send(new StringMessage("test"));
assertEquals("handlers should have been invoked 9 times in total", 9, handlerCounter.get());
assertFalse("first handler should not have handled the message", handler1.handledMessage);