diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java index 70a905c527..01a4b5e2f2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractSubscribableChannel.java @@ -19,7 +19,7 @@ package org.springframework.integration.channel; import org.springframework.integration.dispatcher.MessageDispatcher; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageConsumer; -import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.SubscribableChannel; import org.springframework.util.Assert; /** @@ -28,7 +28,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class AbstractSubscribableChannel extends AbstractMessageChannel implements Subscribable { +public class AbstractSubscribableChannel extends AbstractMessageChannel implements SubscribableChannel { private final T dispatcher; @@ -44,11 +44,11 @@ public class AbstractSubscribableChannel extends Ab } public boolean subscribe(MessageConsumer consumer) { - return this.dispatcher.subscribe(consumer); + return this.dispatcher.addConsumer(consumer); } public boolean unsubscribe(MessageConsumer consumer) { - return this.dispatcher.unsubscribe(consumer); + return this.dispatcher.removeConsumer(consumer); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java index 4f488c2107..05f3d81293 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java @@ -36,17 +36,17 @@ public abstract class AbstractDispatcher implements MessageDispatcher { protected final Log logger = LogFactory.getLog(this.getClass()); - protected final Set subscribers = new CopyOnWriteArraySet(); + protected final Set consumers = new CopyOnWriteArraySet(); private volatile TaskExecutor taskExecutor; - public boolean subscribe(MessageConsumer consumer) { - return this.subscribers.add(consumer); + public boolean addConsumer(MessageConsumer consumer) { + return this.consumers.add(consumer); } - public boolean unsubscribe(MessageConsumer consumer) { - return this.subscribers.remove(consumer); + public boolean removeConsumer(MessageConsumer consumer) { + return this.consumers.remove(consumer); } /** @@ -63,7 +63,7 @@ public abstract class AbstractDispatcher implements MessageDispatcher { } public String toString() { - return this.getClass().getSimpleName() + " with subscribers: " + this.subscribers; + return this.getClass().getSimpleName() + " with consumers: " + this.consumers; } /** @@ -77,7 +77,7 @@ public abstract class AbstractDispatcher implements MessageDispatcher { } catch (MessageRejectedException e) { if (logger.isDebugEnabled()) { - logger.debug("Consumer '" + consumer + "' rejected Message, continuing with other subscribers if available.", e); + logger.debug("Consumer '" + consumer + "' rejected Message, continuing with other consumers if available.", e); } return false; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index c0b211e22c..e1a53dabba 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -23,9 +23,9 @@ import org.springframework.integration.message.MessageConsumer; /** * A broadcasting dispatcher implementation. It makes a best effort to - * send the message to each of its endpoints. If it fails to send to any - * one endpoints, it will log a warn-level message but continue to send - * to the other endpoints. + * send the message to each of its consumers. If it fails to send to any + * one consumer, it will log a warn-level message but continue to send + * to the other consumers. * * @author Mark Fisher */ @@ -45,8 +45,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher { public boolean dispatch(Message message) { int sequenceNumber = 1; - int sequenceSize = this.subscribers.size(); - for (final MessageConsumer consumer : this.subscribers) { + int sequenceSize = this.consumers.size(); + for (final MessageConsumer consumer : this.consumers) { final Message messageToSend = (!this.applySequence) ? message : MessageBuilder.fromMessage(message) .setSequenceNumber(sequenceNumber++) diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java index 1dd6051d82..39b0b8b1f4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java @@ -17,14 +17,18 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.message.Message; -import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.MessageConsumer; /** - * Strategy interface for dispatching messages. + * Strategy interface for dispatching messages to consumers. * * @author Mark Fisher */ -public interface MessageDispatcher extends Subscribable { +public interface MessageDispatcher { + + boolean addConsumer(MessageConsumer consumer); + + boolean removeConsumer(MessageConsumer consumer); boolean dispatch(Message message); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index 8c868abaae..d0e13925ee 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -23,11 +23,11 @@ import org.springframework.integration.message.MessageRejectedException; /** * Basic implementation of {@link MessageDispatcher} that will attempt - * to send a {@link Message} to one of its subscribers. As soon as one - * of the subscribers accepts the Message, the dispatcher will return 'true'. + * to send a {@link Message} to one of its consumers. As soon as one + * of the consumers accepts the Message, the dispatcher will return 'true'. *

- * If the dispatcher has no subscribers, a {@link MessageDeliveryException} - * will be thrown. If all subscribers reject the Message, the dispatcher will + * If the dispatcher has no consumers, a {@link MessageDeliveryException} + * will be thrown. If all consumers reject the Message, the dispatcher will * throw a MessageRejectedException. * * @author Mark Fisher @@ -35,12 +35,12 @@ import org.springframework.integration.message.MessageRejectedException; public class SimpleDispatcher extends AbstractDispatcher { public boolean dispatch(Message message) { - if (this.subscribers.size() == 0) { + if (this.consumers.size() == 0) { throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } int count = 0; int rejectedExceptionCount = 0; - for (MessageConsumer consumer : this.subscribers) { + for (MessageConsumer consumer : this.consumers) { count++; if (this.sendMessageToConsumer(message, consumer)) { return true; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java index 60b98b530e..8e5c431604 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java @@ -26,7 +26,7 @@ import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.SubscribableChannel; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; import org.springframework.util.Assert; @@ -90,7 +90,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint if (this.taskExecutor != null) { this.poller.setTaskExecutor(this.taskExecutor); } - this.poller.subscribe(this); + this.poller.setConsumer(this); } this.initialized = true; } @@ -105,8 +105,8 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint this.afterPropertiesSet(); } Assert.notNull(this.inputChannel, "failed to start endpoint, inputChannel is required"); - if (this.inputChannel instanceof Subscribable) { - ((Subscribable) inputChannel).subscribe(this); + if (this.inputChannel instanceof SubscribableChannel) { + ((SubscribableChannel) inputChannel).subscribe(this); } else if (this.inputChannel instanceof PollableChannel) { Assert.notNull(this.getTaskScheduler(), @@ -122,8 +122,8 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint if (!this.running) { return; } - if (this.inputChannel instanceof Subscribable) { - ((Subscribable) inputChannel).unsubscribe(this); + if (this.inputChannel instanceof SubscribableChannel) { + ((SubscribableChannel) inputChannel).unsubscribe(this); } else if (this.pollerFuture != null) { this.pollerFuture.cancel(true); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java index 27f0d8ed05..9f9bc5657e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java @@ -17,23 +17,21 @@ package org.springframework.integration.endpoint; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.dispatcher.SimpleDispatcher; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageConsumer; -import org.springframework.integration.message.Subscribable; import org.springframework.integration.scheduling.Trigger; import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ChannelPoller extends AbstractPoller implements Subscribable { +public class ChannelPoller extends AbstractPoller { private final PollableChannel channel; - private volatile long receiveTimeout = 1000; + private volatile MessageConsumer consumer; - private final SimpleDispatcher dispatcher = new SimpleDispatcher(); + private volatile long receiveTimeout = 1000; public ChannelPoller(PollableChannel channel, Trigger trigger) { @@ -51,23 +49,21 @@ public class ChannelPoller extends AbstractPoller implements Subscribable { this.receiveTimeout = receiveTimeout; } - public boolean subscribe(MessageConsumer consumer) { - return this.dispatcher.subscribe(consumer); - } - - public boolean unsubscribe(MessageConsumer consumer) { - return this.dispatcher.unsubscribe(consumer); + public void setConsumer(MessageConsumer consumer) { + this.consumer = consumer; } @Override protected boolean doPoll() { + Assert.notNull(this.consumer, "consumer must not be null"); Message message = (this.receiveTimeout >= 0) ? this.channel.receive(this.receiveTimeout) : this.channel.receive(); if (message == null) { return false; } - return this.dispatcher.dispatch(message); + this.consumer.onMessage(message); + return true; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableChannel.java similarity index 75% rename from org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java rename to org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableChannel.java index 4eda0a0694..dfbac156b4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableChannel.java @@ -16,20 +16,22 @@ package org.springframework.integration.message; +import org.springframework.integration.channel.MessageChannel; + /** - * Interface for any source of messages that accepts subscribers. + * Interface for any MessageChannel implementation that accepts subscribers. * * @author Mark Fisher */ -public interface Subscribable { +public interface SubscribableChannel extends MessageChannel { /** - * Register a {@link MessageConsumer} as a subscriber to this source. + * Register a {@link MessageConsumer} as a subscriber to this channel. */ boolean subscribe(MessageConsumer consumer); /** - * Remove a {@link MessageConsumer} from the subscribers of this source. + * Remove a {@link MessageConsumer} from the subscribers of this channel. */ boolean unsubscribe(MessageConsumer consumer); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index ae0f073fa1..2a5dba3b06 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -72,7 +72,7 @@ public class BroadcastingDispatcherTests { @Test public void singleTargetWithoutTaskExecutor() throws Exception { dispatcher.setTaskExecutor(null); - dispatcher.subscribe(targetMock1); + dispatcher.addConsumer(targetMock1); targetMock1.onMessage(messageMock); expectLastCall(); replay(globalMocks); @@ -82,7 +82,7 @@ public class BroadcastingDispatcherTests { @Test public void singleTargetWithTaskExecutor() throws Exception { - dispatcher.subscribe(targetMock1); + dispatcher.addConsumer(targetMock1); targetMock1.onMessage(messageMock); expectLastCall(); replay(globalMocks); @@ -93,9 +93,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsWithoutTaskExecutor() { dispatcher.setTaskExecutor(null); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); targetMock1.onMessage(messageMock); expectLastCall(); targetMock2.onMessage(messageMock); @@ -109,9 +109,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsWithTaskExecutor() { - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); targetMock1.onMessage(messageMock); expectLastCall(); targetMock2.onMessage(messageMock); @@ -126,9 +126,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureFirst() { reset(taskExecutorMock); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); partialFailingExecutorMock(false, true, true); targetMock2.onMessage(messageMock); expectLastCall(); @@ -142,9 +142,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureMiddle() { reset(taskExecutorMock); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); partialFailingExecutorMock(true, false, true); targetMock1.onMessage(messageMock); expectLastCall(); @@ -158,9 +158,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsPartialFailureLast() { reset(taskExecutorMock); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); partialFailingExecutorMock(true, true, false); targetMock1.onMessage(messageMock); expectLastCall(); @@ -174,9 +174,9 @@ public class BroadcastingDispatcherTests { @Test public void multipleTargetsAllFail() { reset(taskExecutorMock); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); partialFailingExecutorMock(false, false, false); replay(globalMocks); dispatcher.dispatch(messageMock); @@ -185,9 +185,9 @@ public class BroadcastingDispatcherTests { @Test public void noDuplicateSubscription() { - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock1); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock1); targetMock1.onMessage(messageMock); expectLastCall(); replay(globalMocks); @@ -196,11 +196,11 @@ public class BroadcastingDispatcherTests { } @Test - public void unsubscribeBeforeSend() { - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); - dispatcher.unsubscribe(targetMock2); + public void removeConsumerBeforeSend() { + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); + dispatcher.removeConsumer(targetMock2); targetMock1.onMessage(messageMock); expectLastCall(); targetMock3.onMessage(messageMock); @@ -211,10 +211,10 @@ public class BroadcastingDispatcherTests { } @Test - public void unsubscribeBetweenSends() { - dispatcher.subscribe(targetMock1); - dispatcher.subscribe(targetMock2); - dispatcher.subscribe(targetMock3); + public void removeConsumerBetweenSends() { + dispatcher.addConsumer(targetMock1); + dispatcher.addConsumer(targetMock2); + dispatcher.addConsumer(targetMock3); targetMock1.onMessage(messageMock); expectLastCall().times(2); targetMock2.onMessage(messageMock); @@ -223,7 +223,7 @@ public class BroadcastingDispatcherTests { expectLastCall().times(2); replay(globalMocks); dispatcher.dispatch(messageMock); - dispatcher.unsubscribe(targetMock2); + dispatcher.removeConsumer(targetMock2); dispatcher.dispatch(messageMock); verify(globalMocks); } @@ -234,8 +234,8 @@ public class BroadcastingDispatcherTests { final List> messages = Collections.synchronizedList(new ArrayList>()); MessageConsumer target1 = new MessageStoringTestEndpoint(messages); MessageConsumer target2 = new MessageStoringTestEndpoint(messages); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); dispatcher.dispatch(new StringMessage("test")); assertEquals(2, messages.size()); assertEquals(0, (int) messages.get(0).getHeaders().getSequenceNumber()); @@ -252,9 +252,9 @@ public class BroadcastingDispatcherTests { MessageConsumer target1 = new MessageStoringTestEndpoint(messages); MessageConsumer target2 = new MessageStoringTestEndpoint(messages); MessageConsumer target3 = new MessageStoringTestEndpoint(messages); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); dispatcher.dispatch(new StringMessage("test")); assertEquals(3, messages.size()); assertEquals(1, (int) messages.get(0).getHeaders().getSequenceNumber()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index bbbff0a741..34b7865f4d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -45,7 +45,7 @@ public class SimpleDispatcherTests { public void singleMessage() throws InterruptedException { SimpleDispatcher dispatcher = new SimpleDispatcher(); final CountDownLatch latch = new CountDownLatch(1); - dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch))); + dispatcher.addConsumer(createEndpoint(TestHandlers.countDownHandler(latch))); dispatcher.dispatch(new StringMessage("test")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); @@ -57,8 +57,8 @@ public class SimpleDispatcherTests { final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter1 = new AtomicInteger(); final AtomicInteger counter2 = new AtomicInteger(); - dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); - dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); + dispatcher.addConsumer(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); + dispatcher.addConsumer(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); dispatcher.dispatch(new StringMessage("test")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); @@ -70,8 +70,8 @@ public class SimpleDispatcherTests { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageConsumer target = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target); - dispatcher.subscribe(target); + dispatcher.addConsumer(target); + dispatcher.addConsumer(target); try { dispatcher.dispatch(new StringMessage("test")); } @@ -82,16 +82,16 @@ public class SimpleDispatcherTests { } @Test - public void unsubscribeBeforeSend() { + public void removeConsumerBeforeSend() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageConsumer target1 = new CountingTestEndpoint(counter, false); MessageConsumer target2 = new CountingTestEndpoint(counter, false); MessageConsumer target3 = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); - dispatcher.unsubscribe(target2); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); + dispatcher.removeConsumer(target2); try { dispatcher.dispatch(new StringMessage("test")); } @@ -102,15 +102,15 @@ public class SimpleDispatcherTests { } @Test - public void unsubscribeBetweenSends() { + public void removeConsumerBetweenSends() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageConsumer target1 = new CountingTestEndpoint(counter, false); MessageConsumer target2 = new CountingTestEndpoint(counter, false); MessageConsumer target3 = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); try { dispatcher.dispatch(new StringMessage("test1")); } @@ -118,7 +118,7 @@ public class SimpleDispatcherTests { // ignore } assertEquals(3, counter.get()); - dispatcher.unsubscribe(target2); + dispatcher.removeConsumer(target2); try { dispatcher.dispatch(new StringMessage("test2")); } @@ -126,7 +126,7 @@ public class SimpleDispatcherTests { // ignore } assertEquals(5, counter.get()); - dispatcher.unsubscribe(target1); + dispatcher.removeConsumer(target1); try { dispatcher.dispatch(new StringMessage("test3")); } @@ -137,11 +137,11 @@ public class SimpleDispatcherTests { } @Test(expected = MessageDeliveryException.class) - public void unsubscribeLastTargetCausesDeliveryException() { + public void removeConsumerLastTargetCausesDeliveryException() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageConsumer target = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target); + dispatcher.addConsumer(target); try { dispatcher.dispatch(new StringMessage("test1")); } @@ -149,7 +149,7 @@ public class SimpleDispatcherTests { // ignore } assertEquals(1, counter.get()); - dispatcher.unsubscribe(target); + dispatcher.removeConsumer(target); dispatcher.dispatch(new StringMessage("test2")); } @@ -167,9 +167,9 @@ public class SimpleDispatcherTests { endpoint1.setSelector(new TestMessageSelector(selectorCounter, false)); endpoint2.setSelector(new TestMessageSelector(selectorCounter, false)); endpoint3.setSelector(new TestMessageSelector(selectorCounter, true)); - dispatcher.subscribe(endpoint1); - dispatcher.subscribe(endpoint2); - dispatcher.subscribe(endpoint3); + dispatcher.addConsumer(endpoint1); + dispatcher.addConsumer(endpoint2); + dispatcher.addConsumer(endpoint3); dispatcher.dispatch(new StringMessage("test")); assertEquals(0, latch.getCount()); assertEquals("selectors should have been invoked one time each", 3, selectorCounter.get()); @@ -192,9 +192,9 @@ public class SimpleDispatcherTests { endpoint1.setSelector(new TestMessageSelector(selectorCounter, false)); endpoint2.setSelector(new TestMessageSelector(selectorCounter, false)); endpoint3.setSelector(new TestMessageSelector(selectorCounter, false)); - dispatcher.subscribe(endpoint1); - dispatcher.subscribe(endpoint2); - dispatcher.subscribe(endpoint3); + dispatcher.addConsumer(endpoint1); + dispatcher.addConsumer(endpoint2); + dispatcher.addConsumer(endpoint3); boolean exceptionThrown = false; try { dispatcher.dispatch(new StringMessage("test")); @@ -216,9 +216,9 @@ public class SimpleDispatcherTests { MessageConsumer target1 = new CountingTestEndpoint(counter, true); MessageConsumer target2 = new CountingTestEndpoint(counter, false); MessageConsumer target3 = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); assertTrue(dispatcher.dispatch(new StringMessage("test"))); assertEquals("only the first target should have been invoked", 1, counter.get()); } @@ -230,9 +230,9 @@ public class SimpleDispatcherTests { MessageConsumer target1 = new CountingTestEndpoint(counter, false); MessageConsumer target2 = new CountingTestEndpoint(counter, true); MessageConsumer target3 = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); assertTrue(dispatcher.dispatch(new StringMessage("test"))); assertEquals("first two targets should have been invoked", 2, counter.get()); } @@ -244,9 +244,9 @@ public class SimpleDispatcherTests { MessageConsumer target1 = new CountingTestEndpoint(counter, false); MessageConsumer target2 = new CountingTestEndpoint(counter, false); MessageConsumer target3 = new CountingTestEndpoint(counter, false); - dispatcher.subscribe(target1); - dispatcher.subscribe(target2); - dispatcher.subscribe(target3); + dispatcher.addConsumer(target1); + dispatcher.addConsumer(target2); + dispatcher.addConsumer(target3); try { assertFalse(dispatcher.dispatch(new StringMessage("test"))); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java index a7565bd7cd..ad71df2668 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java @@ -49,7 +49,7 @@ public class ChannelPollerTests { @Before public void init() { poller = new ChannelPoller(channelMock, triggerMock); - poller.subscribe(endpointMock); + poller.setConsumer(endpointMock); poller.setReceiveTimeout(-1); reset(globalMocks); } @@ -113,7 +113,7 @@ public class ChannelPollerTests { @Test public void blockingSourceTimedOut() { poller = new ChannelPoller(channelMock, triggerMock); - poller.subscribe(endpointMock); + poller.setConsumer(endpointMock); // we don't need to await the timeout, returning null suffices expect(channelMock.receive(1)).andReturn(null); replay(globalMocks); @@ -125,7 +125,7 @@ public class ChannelPollerTests { @Test public void blockingSourceNotTimedOut() { poller = new ChannelPoller(channelMock, triggerMock); - poller.subscribe(endpointMock); + poller.setConsumer(endpointMock); expect(channelMock.receive(1)).andReturn(messageMock); endpointMock.onMessage(messageMock); expectLastCall();