diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java index 2fc5768574..1e55ccd6ae 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java @@ -61,9 +61,9 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(3); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(3); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -80,9 +80,9 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(2); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -98,10 +98,10 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(5); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(5); + task.setReceiveTimeout(0); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -117,10 +117,10 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(2); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(2); + task.setReceiveTimeout(0); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -141,10 +141,10 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(5); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(5); + task.setReceiveTimeout(0); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -164,10 +164,10 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(2); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(2); + task.setReceiveTimeout(0); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); @@ -187,10 +187,10 @@ public class ByteStreamTargetTests { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(2); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(2); + task.setReceiveTimeout(0); task.subscribe(target); channel.send(new GenericMessage(new byte[] {1,2,3}), 0); channel.send(new GenericMessage(new byte[] {4,5,6}), 0); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java index 4ec1f806b4..40f93800ec 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java @@ -79,9 +79,9 @@ public class CharacterStreamTargetTests { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(2); task.subscribe(target); channel.send(new StringMessage("foo"), 0); channel.send(new StringMessage("bar"), 0); @@ -94,10 +94,10 @@ public class CharacterStreamTargetTests { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setMaxMessagesPerTask(10); - dispatcherPolicy.setReceiveTimeout(0); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setMaxMessagesPerTask(10); + task.setReceiveTimeout(0); task.subscribe(target); target.setShouldAppendNewLine(true); channel.send(new StringMessage("foo"), 0); @@ -125,10 +125,10 @@ public class CharacterStreamTargetTests { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setReceiveTimeout(0); - dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setReceiveTimeout(0); + task.setMaxMessagesPerTask(2); task.subscribe(target); TestObject testObject1 = new TestObject("foo"); TestObject testObject2 = new TestObject("bar"); @@ -143,11 +143,11 @@ public class CharacterStreamTargetTests { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(); - dispatcherPolicy.setReceiveTimeout(0); - dispatcherPolicy.setMaxMessagesPerTask(2); QueueChannel channel = new QueueChannel(5, dispatcherPolicy); target.setShouldAppendNewLine(true); PollingDispatcherTask task = new PollingDispatcherTask(channel, null); + task.setReceiveTimeout(0); + task.setMaxMessagesPerTask(2); task.subscribe(target); TestObject testObject1 = new TestObject("foo"); TestObject testObject2 = new TestObject("bar"); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java index 1cdf01139e..5b0c9fe816 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DispatcherPolicy.java @@ -26,10 +26,6 @@ import org.springframework.util.Assert; */ public class DispatcherPolicy { - public final static int DEFAULT_MAX_MESSAGES_PER_TASK = 1; - - public final static long DEFAULT_RECEIVE_TIMEOUT = 1000; - public final static int DEFAULT_REJECTION_LIMIT = 5; public final static long DEFAULT_RETRY_INTERVAL = 1000; @@ -37,10 +33,6 @@ public class DispatcherPolicy { private final boolean publishSubscribe; - private volatile int maxMessagesPerTask = DEFAULT_MAX_MESSAGES_PER_TASK; - - private volatile long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT; - private volatile int rejectionLimit = DEFAULT_REJECTION_LIMIT; private volatile long retryInterval = DEFAULT_RETRY_INTERVAL; @@ -72,35 +64,6 @@ public class DispatcherPolicy { return this.publishSubscribe; } - /** - * Return the maximum number of messages for each retrieval attempt. - */ - public int getMaxMessagesPerTask() { - return this.maxMessagesPerTask; - } - - /** - * Set the maximum number of messages for each retrieval attempt. - */ - public void setMaxMessagesPerTask(int maxMessagesPerTask) { - Assert.isTrue(maxMessagesPerTask > 0, "'maxMessagePerTask' must be at least 1"); - this.maxMessagesPerTask = maxMessagesPerTask; - } - - /** - * Return the maximum amount of time in milliseconds to wait for a message to be available. - */ - public long getReceiveTimeout() { - return this.receiveTimeout; - } - - /** - * Set the maximum amount of time in milliseconds to wait for a message to be available. - */ - public void setReceiveTimeout(long receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - /** * Return the maximum number of retries upon rejection. */ diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java index eb49d1b6db..5696741cda 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java @@ -88,8 +88,6 @@ public class ThreadLocalChannel extends AbstractMessageChannel { private static DispatcherPolicy defaultDispatcherPolicy() { DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(false); - dispatcherPolicy.setMaxMessagesPerTask(1); - dispatcherPolicy.setReceiveTimeout(0); dispatcherPolicy.setRejectionLimit(1); dispatcherPolicy.setRetryInterval(0); dispatcherPolicy.setShouldFailOnRejectionLimit(false); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java index 03efddafdb..aec8dd275a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/config/AbstractChannelParser.java @@ -49,6 +49,7 @@ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinition private static final String INTERCEPTORS_PROPERTY = "interceptors"; + @Override protected boolean shouldGenerateId() { return false; @@ -104,14 +105,6 @@ public abstract class AbstractChannelParser extends AbstractSingleBeanDefinition } private void configureDispatcherPolicy(Element element, DispatcherPolicy dispatcherPolicy) { - String maxMessagesPerTask = element.getAttribute("max-messages-per-task"); - if (StringUtils.hasText(maxMessagesPerTask)) { - dispatcherPolicy.setMaxMessagesPerTask(Integer.parseInt(maxMessagesPerTask)); - } - String receiveTimeout = element.getAttribute("receive-timeout"); - if (StringUtils.hasText(receiveTimeout)) { - dispatcherPolicy.setReceiveTimeout(Long.parseLong(receiveTimeout)); - } String rejectionLimit = element.getAttribute("rejection-limit"); if (StringUtils.hasText(rejectionLimit)) { dispatcherPolicy.setRejectionLimit(Integer.parseInt(rejectionLimit)); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index b6bb72f416..8247de852d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -295,8 +295,6 @@ Defines a dispatcher policy. - - diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java index 084d624d10..8f09a993b4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java @@ -86,7 +86,7 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl @Override protected boolean doSend(Message message, long timeout) { if (message != null && this.handlerCount.get() > 0) { - return this.dispatcher.dispatch(message); + return this.dispatcher.send(message); } return false; } @@ -102,8 +102,6 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl private static DispatcherPolicy defaultDispatcherPolicy() { DispatcherPolicy dispatcherPolicy = new DispatcherPolicy(false); - dispatcherPolicy.setMaxMessagesPerTask(1); - dispatcherPolicy.setReceiveTimeout(0); dispatcherPolicy.setRejectionLimit(1); dispatcherPolicy.setRetryInterval(0); dispatcherPolicy.setShouldFailOnRejectionLimit(false); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java index 6bc4b74b13..5adeb2e3fe 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/MessageDispatcher.java @@ -17,15 +17,16 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; /** * Strategy interface for dispatching messages. * * @author Mark Fisher */ -public interface MessageDispatcher { +public interface MessageDispatcher extends MessageTarget { - boolean dispatch(Message message); + boolean send(Message message); void setSendTimeout(long timeout); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java index 6933727d75..2f2cfb1f81 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcherTask.java @@ -37,6 +37,10 @@ public class PollingDispatcherTask implements SchedulableTask, Subscribable { private final SimpleDispatcher dispatcher; + private volatile long receiveTimeout = -1; + + private volatile int maxMessagesPerTask = 1; + public PollingDispatcherTask(MessageChannel channel, Schedule schedule) { Assert.notNull(channel, "channel must not be null"); @@ -46,6 +50,22 @@ public class PollingDispatcherTask implements SchedulableTask, Subscribable { } + /** + * Set the maximum amount of time in milliseconds to wait for a message to be available. + * A negative value indicates that receive calls should block indefinitely. + */ + public void setReceiveTimeout(long receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + /** + * Set the maximum number of messages for each retrieval attempt. + */ + public void setMaxMessagesPerTask(int maxMessagesPerTask) { + Assert.isTrue(maxMessagesPerTask > 0, "'maxMessagePerTask' must be at least 1"); + this.maxMessagesPerTask = maxMessagesPerTask; + } + public boolean subscribe(MessageTarget target) { return this.dispatcher.subscribe(target); } @@ -59,15 +79,14 @@ public class PollingDispatcherTask implements SchedulableTask, Subscribable { } public void run() { - long timeout = this.channel.getDispatcherPolicy().getReceiveTimeout(); - int limit = this.channel.getDispatcherPolicy().getMaxMessagesPerTask(); int count = 0; - while (count < limit) { - Message message = (timeout < 0) ? this.channel.receive() : this.channel.receive(timeout); + while (count < this.maxMessagesPerTask) { + Message message = (this.receiveTimeout < 0) ? + this.channel.receive() : this.channel.receive(this.receiveTimeout); if (message == null) { return; } - this.dispatcher.dispatch(message); + this.dispatcher.send(message); count++; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index d0677318dd..86e8fb051e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -30,7 +30,6 @@ import org.springframework.integration.handler.MessageHandlerRejectedExecutionEx import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; -import org.springframework.integration.message.Subscribable; import org.springframework.integration.message.MessageTarget; /** @@ -38,7 +37,7 @@ import org.springframework.integration.message.MessageTarget; * * @author Mark Fisher */ -public class SimpleDispatcher implements MessageDispatcher, Subscribable { +public class SimpleDispatcher implements MessageDispatcher { protected final Log logger = LogFactory.getLog(this.getClass()); @@ -66,7 +65,7 @@ public class SimpleDispatcher implements MessageDispatcher, Subscribable { return this.targets.remove(target); } - public boolean dispatch(Message message) { + public boolean send(Message message) { int attempts = 0; List targetList = new ArrayList(this.targets); while (attempts < this.dispatcherPolicy.getRejectionLimit()) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java index d15619c9a5..ee208a6b47 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java @@ -67,7 +67,7 @@ public class SourceEndpoint extends AbstractEndpoint { if (message == null) { return false; } - boolean sent = this.dispatcher.dispatch(message); + boolean sent = this.dispatcher.send(message); if (this.source instanceof MessageDeliveryAware) { if (sent) { ((MessageDeliveryAware) this.source).onSend(message); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 4306021381..b045853ff1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -131,8 +131,6 @@ public class ChannelParserTests { MessageChannel channel = (MessageChannel) context.getBean("pointToPointChannelByDefault"); DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy(); assertFalse(dispatcherPolicy.isPublishSubscribe()); - assertEquals(DispatcherPolicy.DEFAULT_MAX_MESSAGES_PER_TASK, dispatcherPolicy.getMaxMessagesPerTask()); - assertEquals(DispatcherPolicy.DEFAULT_RECEIVE_TIMEOUT, dispatcherPolicy.getReceiveTimeout()); assertEquals(DispatcherPolicy.DEFAULT_REJECTION_LIMIT, dispatcherPolicy.getRejectionLimit()); assertEquals(DispatcherPolicy.DEFAULT_RETRY_INTERVAL, dispatcherPolicy.getRetryInterval()); assertTrue(dispatcherPolicy.getShouldFailOnRejectionLimit()); @@ -145,10 +143,8 @@ public class ChannelParserTests { MessageChannel channel = (MessageChannel) context.getBean("channelWithDispatcherPolicy"); DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy(); assertTrue(dispatcherPolicy.isPublishSubscribe()); - assertEquals(7, dispatcherPolicy.getMaxMessagesPerTask()); - assertEquals(77, dispatcherPolicy.getReceiveTimeout()); - assertEquals(777, dispatcherPolicy.getRejectionLimit()); - assertEquals(7777, dispatcherPolicy.getRetryInterval()); + assertEquals(7, dispatcherPolicy.getRejectionLimit()); + assertEquals(77, dispatcherPolicy.getRetryInterval()); assertFalse(dispatcherPolicy.getShouldFailOnRejectionLimit()); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml index d1cbbf5b64..45194b4dec 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml @@ -16,10 +16,8 @@ - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java index fea51e9e67..d15b66c2c9 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/ChannelFactoryTests.java @@ -60,11 +60,6 @@ public class ChannelFactoryTests { interceptors.add(new TestChannelInterceptor()); } - @Before - public void initDispatcherPolicy() { - dispatcherPolicy.setMaxMessagesPerTask(100); - } - @Test public void testQueueChannelFactory() { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index f8aaf2daba..970326abf0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -41,7 +41,7 @@ public class SimpleDispatcherTests { SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy()); final CountDownLatch latch = new CountDownLatch(1); dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch))); - dispatcher.dispatch(new StringMessage("test")); + dispatcher.send(new StringMessage("test")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); } @@ -54,7 +54,7 @@ public class SimpleDispatcherTests { final AtomicInteger counter2 = new AtomicInteger(); dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); - dispatcher.dispatch(new StringMessage("test")); + dispatcher.send(new StringMessage("test")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); assertEquals("only 1 handler should have received the message", 1, counter1.get() + counter2.get()); @@ -68,7 +68,7 @@ public class SimpleDispatcherTests { final AtomicInteger counter2 = new AtomicInteger(); dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch))); dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch))); - dispatcher.dispatch(new StringMessage("test")); + dispatcher.send(new StringMessage("test")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); assertEquals(1, counter1.get());