Subscribable is now SubscribableChannel, and MessageDispatcher no longer implements Subscribable.

This commit is contained in:
Mark Fisher
2008-10-02 17:50:18 +00:00
parent 3f9230d73b
commit e5a8972a97
11 changed files with 124 additions and 122 deletions

View File

@@ -72,7 +72,7 @@ public class BroadcastingDispatcherTests {
@Test
public void singleTargetWithoutTaskExecutor() throws Exception {
dispatcher.setTaskExecutor(null);
dispatcher.subscribe(targetMock1);
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
@@ -82,7 +82,7 @@ public class BroadcastingDispatcherTests {
@Test
public void singleTargetWithTaskExecutor() throws Exception {
dispatcher.subscribe(targetMock1);
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
@@ -93,9 +93,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsWithoutTaskExecutor() {
dispatcher.setTaskExecutor(null);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
@@ -109,9 +109,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsWithTaskExecutor() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
@@ -126,9 +126,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureFirst() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
partialFailingExecutorMock(false, true, true);
targetMock2.onMessage(messageMock);
expectLastCall();
@@ -142,9 +142,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureMiddle() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
partialFailingExecutorMock(true, false, true);
targetMock1.onMessage(messageMock);
expectLastCall();
@@ -158,9 +158,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsPartialFailureLast() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
partialFailingExecutorMock(true, true, false);
targetMock1.onMessage(messageMock);
expectLastCall();
@@ -174,9 +174,9 @@ public class BroadcastingDispatcherTests {
@Test
public void multipleTargetsAllFail() {
reset(taskExecutorMock);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
partialFailingExecutorMock(false, false, false);
replay(globalMocks);
dispatcher.dispatch(messageMock);
@@ -185,9 +185,9 @@ public class BroadcastingDispatcherTests {
@Test
public void noDuplicateSubscription() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock1);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
@@ -196,11 +196,11 @@ public class BroadcastingDispatcherTests {
}
@Test
public void unsubscribeBeforeSend() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.unsubscribe(targetMock2);
public void removeConsumerBeforeSend() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
dispatcher.removeConsumer(targetMock2);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
@@ -211,10 +211,10 @@ public class BroadcastingDispatcherTests {
}
@Test
public void unsubscribeBetweenSends() {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
public void removeConsumerBetweenSends() {
dispatcher.addConsumer(targetMock1);
dispatcher.addConsumer(targetMock2);
dispatcher.addConsumer(targetMock3);
targetMock1.onMessage(messageMock);
expectLastCall().times(2);
targetMock2.onMessage(messageMock);
@@ -223,7 +223,7 @@ public class BroadcastingDispatcherTests {
expectLastCall().times(2);
replay(globalMocks);
dispatcher.dispatch(messageMock);
dispatcher.unsubscribe(targetMock2);
dispatcher.removeConsumer(targetMock2);
dispatcher.dispatch(messageMock);
verify(globalMocks);
}
@@ -234,8 +234,8 @@ public class BroadcastingDispatcherTests {
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(2, messages.size());
assertEquals(0, (int) messages.get(0).getHeaders().getSequenceNumber());
@@ -252,9 +252,9 @@ public class BroadcastingDispatcherTests {
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
MessageConsumer target3 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(3, messages.size());
assertEquals(1, (int) messages.get(0).getHeaders().getSequenceNumber());

View File

@@ -45,7 +45,7 @@ public class SimpleDispatcherTests {
public void singleMessage() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch)));
dispatcher.addConsumer(createEndpoint(TestHandlers.countDownHandler(latch)));
dispatcher.dispatch(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -57,8 +57,8 @@ public class SimpleDispatcherTests {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.addConsumer(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
dispatcher.addConsumer(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
dispatcher.dispatch(new StringMessage("test"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
@@ -70,8 +70,8 @@ public class SimpleDispatcherTests {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.addConsumer(target);
dispatcher.addConsumer(target);
try {
dispatcher.dispatch(new StringMessage("test"));
}
@@ -82,16 +82,16 @@ public class SimpleDispatcherTests {
}
@Test
public void unsubscribeBeforeSend() {
public void removeConsumerBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.unsubscribe(target2);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
dispatcher.removeConsumer(target2);
try {
dispatcher.dispatch(new StringMessage("test"));
}
@@ -102,15 +102,15 @@ public class SimpleDispatcherTests {
}
@Test
public void unsubscribeBetweenSends() {
public void removeConsumerBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
try {
dispatcher.dispatch(new StringMessage("test1"));
}
@@ -118,7 +118,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(3, counter.get());
dispatcher.unsubscribe(target2);
dispatcher.removeConsumer(target2);
try {
dispatcher.dispatch(new StringMessage("test2"));
}
@@ -126,7 +126,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(5, counter.get());
dispatcher.unsubscribe(target1);
dispatcher.removeConsumer(target1);
try {
dispatcher.dispatch(new StringMessage("test3"));
}
@@ -137,11 +137,11 @@ public class SimpleDispatcherTests {
}
@Test(expected = MessageDeliveryException.class)
public void unsubscribeLastTargetCausesDeliveryException() {
public void removeConsumerLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.addConsumer(target);
try {
dispatcher.dispatch(new StringMessage("test1"));
}
@@ -149,7 +149,7 @@ public class SimpleDispatcherTests {
// ignore
}
assertEquals(1, counter.get());
dispatcher.unsubscribe(target);
dispatcher.removeConsumer(target);
dispatcher.dispatch(new StringMessage("test2"));
}
@@ -167,9 +167,9 @@ public class SimpleDispatcherTests {
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, true));
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
dispatcher.addConsumer(endpoint1);
dispatcher.addConsumer(endpoint2);
dispatcher.addConsumer(endpoint3);
dispatcher.dispatch(new StringMessage("test"));
assertEquals(0, latch.getCount());
assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get());
@@ -192,9 +192,9 @@ public class SimpleDispatcherTests {
endpoint1.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint2.setSelector(new TestMessageSelector(selectorCounter, false));
endpoint3.setSelector(new TestMessageSelector(selectorCounter, false));
dispatcher.subscribe(endpoint1);
dispatcher.subscribe(endpoint2);
dispatcher.subscribe(endpoint3);
dispatcher.addConsumer(endpoint1);
dispatcher.addConsumer(endpoint2);
dispatcher.addConsumer(endpoint3);
boolean exceptionThrown = false;
try {
dispatcher.dispatch(new StringMessage("test"));
@@ -216,9 +216,9 @@ public class SimpleDispatcherTests {
MessageConsumer target1 = new CountingTestEndpoint(counter, true);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
assertTrue(dispatcher.dispatch(new StringMessage("test")));
assertEquals("only the first target should have been invoked", 1, counter.get());
}
@@ -230,9 +230,9 @@ public class SimpleDispatcherTests {
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, true);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
assertTrue(dispatcher.dispatch(new StringMessage("test")));
assertEquals("first two targets should have been invoked", 2, counter.get());
}
@@ -244,9 +244,9 @@ public class SimpleDispatcherTests {
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.addConsumer(target1);
dispatcher.addConsumer(target2);
dispatcher.addConsumer(target3);
try {
assertFalse(dispatcher.dispatch(new StringMessage("test")));
}

View File

@@ -49,7 +49,7 @@ public class ChannelPollerTests {
@Before
public void init() {
poller = new ChannelPoller(channelMock, triggerMock);
poller.subscribe(endpointMock);
poller.setConsumer(endpointMock);
poller.setReceiveTimeout(-1);
reset(globalMocks);
}
@@ -113,7 +113,7 @@ public class ChannelPollerTests {
@Test
public void blockingSourceTimedOut() {
poller = new ChannelPoller(channelMock, triggerMock);
poller.subscribe(endpointMock);
poller.setConsumer(endpointMock);
// we don't need to await the timeout, returning null suffices
expect(channelMock.receive(1)).andReturn(null);
replay(globalMocks);
@@ -125,7 +125,7 @@ public class ChannelPollerTests {
@Test
public void blockingSourceNotTimedOut() {
poller = new ChannelPoller(channelMock, triggerMock);
poller.subscribe(endpointMock);
poller.setConsumer(endpointMock);
expect(channelMock.receive(1)).andReturn(messageMock);
endpointMock.onMessage(messageMock);
expectLastCall();