Removed 'maxMessagesPerTask' and 'receiveTimeout' properties from DispatcherPolicy and added those same properties to PollingDispatcherTask. The MessageDispatcher interface now extends the MessageTarget interface. Therefore, the 'dispatch(Message)' method has been replaced with 'send(Message)'.
This commit is contained in:
@@ -131,8 +131,6 @@ public class ChannelParserTests {
|
||||
MessageChannel channel = (MessageChannel) context.getBean("pointToPointChannelByDefault");
|
||||
DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy();
|
||||
assertFalse(dispatcherPolicy.isPublishSubscribe());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_MAX_MESSAGES_PER_TASK, dispatcherPolicy.getMaxMessagesPerTask());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_RECEIVE_TIMEOUT, dispatcherPolicy.getReceiveTimeout());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_REJECTION_LIMIT, dispatcherPolicy.getRejectionLimit());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_RETRY_INTERVAL, dispatcherPolicy.getRetryInterval());
|
||||
assertTrue(dispatcherPolicy.getShouldFailOnRejectionLimit());
|
||||
@@ -145,10 +143,8 @@ public class ChannelParserTests {
|
||||
MessageChannel channel = (MessageChannel) context.getBean("channelWithDispatcherPolicy");
|
||||
DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy();
|
||||
assertTrue(dispatcherPolicy.isPublishSubscribe());
|
||||
assertEquals(7, dispatcherPolicy.getMaxMessagesPerTask());
|
||||
assertEquals(77, dispatcherPolicy.getReceiveTimeout());
|
||||
assertEquals(777, dispatcherPolicy.getRejectionLimit());
|
||||
assertEquals(7777, dispatcherPolicy.getRetryInterval());
|
||||
assertEquals(7, dispatcherPolicy.getRejectionLimit());
|
||||
assertEquals(77, dispatcherPolicy.getRetryInterval());
|
||||
assertFalse(dispatcherPolicy.getShouldFailOnRejectionLimit());
|
||||
}
|
||||
|
||||
|
||||
@@ -16,10 +16,8 @@
|
||||
<channel id="publishSubscribeChannel" publish-subscribe="true"/>
|
||||
|
||||
<channel id="channelWithDispatcherPolicy" publish-subscribe="true">
|
||||
<dispatcher-policy max-messages-per-task="7"
|
||||
receive-timeout="77"
|
||||
rejection-limit="777"
|
||||
retry-interval="7777"
|
||||
<dispatcher-policy rejection-limit="7"
|
||||
retry-interval="77"
|
||||
should-fail-on-rejection-limit="false"/>
|
||||
</channel>
|
||||
|
||||
|
||||
@@ -60,11 +60,6 @@ public class ChannelFactoryTests {
|
||||
interceptors.add(new TestChannelInterceptor());
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initDispatcherPolicy() {
|
||||
dispatcherPolicy.setMaxMessagesPerTask(100);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQueueChannelFactory() {
|
||||
|
||||
@@ -41,7 +41,7 @@ public class SimpleDispatcherTests {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
dispatcher.subscribe(createEndpoint(TestHandlers.countDownHandler(latch)));
|
||||
dispatcher.dispatch(new StringMessage("test"));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
}
|
||||
@@ -54,7 +54,7 @@ public class SimpleDispatcherTests {
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
|
||||
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
|
||||
dispatcher.dispatch(new StringMessage("test"));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals("only 1 handler should have received the message", 1, counter1.get() + counter2.get());
|
||||
@@ -68,7 +68,7 @@ public class SimpleDispatcherTests {
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
|
||||
dispatcher.subscribe(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
|
||||
dispatcher.dispatch(new StringMessage("test"));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals(1, counter1.get());
|
||||
|
||||
Reference in New Issue
Block a user