Removed DispatcherPolicy (INT-292).
This commit is contained in:
@@ -66,7 +66,7 @@ public class PriorityChannelTests {
|
||||
|
||||
@Test
|
||||
public void testCustomComparator() {
|
||||
PriorityChannel channel = new PriorityChannel(5, null, new StringPayloadComparator());
|
||||
PriorityChannel channel = new PriorityChannel(5, new StringPayloadComparator());
|
||||
Message<?> messageA = new StringMessage("A");
|
||||
Message<?> messageB = new StringMessage("B");
|
||||
Message<?> messageC = new StringMessage("C");
|
||||
|
||||
@@ -26,7 +26,6 @@ import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.FatalBeanException;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.integration.channel.DispatcherPolicy;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
import org.springframework.integration.config.TestChannelInterceptor;
|
||||
@@ -88,29 +87,6 @@ public class ChannelParserTests {
|
||||
assertEquals(taskExecutorBean, taskExecutorProperty);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultDispatcherPolicy() throws InterruptedException {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"channelParserTests.xml", this.getClass());
|
||||
MessageChannel channel = (MessageChannel) context.getBean("queueChannelByDefault");
|
||||
DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy();
|
||||
assertFalse(dispatcherPolicy.isPublishSubscribe());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_REJECTION_LIMIT, dispatcherPolicy.getRejectionLimit());
|
||||
assertEquals(DispatcherPolicy.DEFAULT_RETRY_INTERVAL, dispatcherPolicy.getRetryInterval());
|
||||
assertTrue(dispatcherPolicy.getShouldFailOnRejectionLimit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDispatcherPolicyConfiguration() throws InterruptedException {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"channelParserTests.xml", this.getClass());
|
||||
MessageChannel channel = (MessageChannel) context.getBean("channelWithDispatcherPolicy");
|
||||
DispatcherPolicy dispatcherPolicy = channel.getDispatcherPolicy();
|
||||
assertEquals(7, dispatcherPolicy.getRejectionLimit());
|
||||
assertEquals(77, dispatcherPolicy.getRetryInterval());
|
||||
assertFalse(dispatcherPolicy.getShouldFailOnRejectionLimit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDatatypeChannelWithCorrectType() {
|
||||
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
|
||||
@@ -16,12 +16,6 @@
|
||||
<publish-subscribe-channel id="publishSubscribeChannelWithTaskExecutorRef"
|
||||
task-executor="taskExecutor"/>
|
||||
|
||||
<channel id="channelWithDispatcherPolicy">
|
||||
<dispatcher-policy rejection-limit="7"
|
||||
retry-interval="77"
|
||||
should-fail-on-rejection-limit="false"/>
|
||||
</channel>
|
||||
|
||||
<channel id="integerChannel" datatype="java.lang.Integer"/>
|
||||
|
||||
<channel id="numberChannel" datatype="java.lang.Number"/>
|
||||
|
||||
@@ -28,14 +28,11 @@ import org.junit.Test;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
import org.springframework.context.support.StaticApplicationContext;
|
||||
import org.springframework.integration.bus.DefaultChannelFactoryBean;
|
||||
import org.springframework.integration.bus.DefaultMessageBus;
|
||||
import org.springframework.integration.channel.AbstractMessageChannel;
|
||||
import org.springframework.integration.channel.ChannelInterceptor;
|
||||
import org.springframework.integration.channel.DispatcherPolicy;
|
||||
import org.springframework.integration.channel.MessageChannel;
|
||||
import org.springframework.integration.channel.PriorityChannel;
|
||||
import org.springframework.integration.channel.QueueChannel;
|
||||
@@ -53,8 +50,6 @@ public class ChannelFactoryTests {
|
||||
|
||||
private final ArrayList<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
|
||||
|
||||
private final DispatcherPolicy dispatcherPolicy = new DispatcherPolicy();
|
||||
|
||||
|
||||
@Before
|
||||
public void initInterceptorsList() {
|
||||
@@ -76,7 +71,7 @@ public class ChannelFactoryTests {
|
||||
DirectChannelFactory channelFactory = new DirectChannelFactory();
|
||||
assertNotNull(interceptors);
|
||||
AbstractMessageChannel channel = (AbstractMessageChannel)
|
||||
channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors);
|
||||
channelFactory.getChannel("testChannel", interceptors);
|
||||
assertEquals(DirectChannel.class, channel.getClass());
|
||||
assertEquals("testChannel", channel.getName());
|
||||
assertInterceptors(channel);
|
||||
@@ -99,7 +94,7 @@ public class ChannelFactoryTests {
|
||||
ThreadLocalChannelFactory channelFactory = new ThreadLocalChannelFactory();
|
||||
assertNotNull(interceptors);
|
||||
AbstractMessageChannel channel = (AbstractMessageChannel)
|
||||
channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors);
|
||||
channelFactory.getChannel("testChannel", interceptors);
|
||||
assertEquals(ThreadLocalChannel.class, channel.getClass());
|
||||
assertEquals("testChannel", channel.getName());
|
||||
assertInterceptors(channel);
|
||||
@@ -114,36 +109,21 @@ public class ChannelFactoryTests {
|
||||
BeanDefinitionBuilder messageBusDefinitionBuilder = BeanDefinitionBuilder.rootBeanDefinition(DefaultMessageBus.class);
|
||||
messageBusDefinitionBuilder.getBeanDefinition().getPropertyValues().addPropertyValue("channelFactory", channelFactory);
|
||||
applicationContext.registerBeanDefinition("messageBus", messageBusDefinitionBuilder.getBeanDefinition());
|
||||
DefaultChannelFactoryBean channelFactoryBean = new DefaultChannelFactoryBean(dispatcherPolicy);
|
||||
DefaultChannelFactoryBean channelFactoryBean = new DefaultChannelFactoryBean();
|
||||
channelFactoryBean.setBeanName("testChannel");
|
||||
channelFactoryBean.setApplicationContext(applicationContext);
|
||||
channelFactoryBean.setInterceptors(interceptors);
|
||||
StubChannel channel = (StubChannel) channelFactoryBean.getObject();
|
||||
assertEquals("testChannel", channel.getName());
|
||||
assertTrue(dispatcherPolicy == channel.getDispatcherPolicy());
|
||||
assertInterceptors(channel);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultChannelFactoryBeanInApplicationContext() throws Exception{
|
||||
ApplicationContext context = new ClassPathXmlApplicationContext(
|
||||
"defaultChannelFactoryBeanTests.xml", this.getClass());
|
||||
MessageChannel channel = (MessageChannel) context.getBean("testChannel");
|
||||
assertEquals(StubChannel.class, channel.getClass());
|
||||
assertEquals("testChannel", channel.getName());
|
||||
DispatcherPolicy dispatcherPolicy = (DispatcherPolicy) context.getBean("dispatcherPolicy");
|
||||
assertTrue(dispatcherPolicy == channel.getDispatcherPolicy());
|
||||
}
|
||||
|
||||
|
||||
private void genericChannelFactoryTests(ChannelFactory channelFactory, Class<?> expectedChannelClass) {
|
||||
assertNotNull(dispatcherPolicy);
|
||||
assertNotNull(interceptors);
|
||||
AbstractMessageChannel channel = (AbstractMessageChannel)
|
||||
channelFactory.getChannel("testChannel", dispatcherPolicy, interceptors);
|
||||
AbstractMessageChannel channel = (AbstractMessageChannel) channelFactory.getChannel("testChannel", interceptors);
|
||||
assertEquals(expectedChannelClass, channel.getClass());
|
||||
assertEquals("testChannel", channel.getName());
|
||||
assertTrue(channel.getDispatcherPolicy() == dispatcherPolicy);
|
||||
assertInterceptors(channel);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ package org.springframework.integration.channel.factory;
|
||||
import java.util.List;
|
||||
|
||||
import org.springframework.integration.channel.AbstractMessageChannel;
|
||||
import org.springframework.integration.channel.DispatcherPolicy;
|
||||
import org.springframework.integration.message.Message;
|
||||
import org.springframework.integration.message.selector.MessageSelector;
|
||||
|
||||
@@ -28,10 +27,6 @@ import org.springframework.integration.message.selector.MessageSelector;
|
||||
*/
|
||||
public class StubChannel extends AbstractMessageChannel {
|
||||
|
||||
public StubChannel(DispatcherPolicy dispatcherPolicy) {
|
||||
super(dispatcherPolicy);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message<?> doReceive(long timeout) {
|
||||
return null;
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.integration.channel.factory;
|
||||
|
||||
import org.springframework.integration.channel.AbstractMessageChannel;
|
||||
import org.springframework.integration.channel.DispatcherPolicy;
|
||||
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
@@ -25,8 +24,8 @@ import org.springframework.integration.channel.DispatcherPolicy;
|
||||
public class StubChannelFactory extends AbstractChannelFactory {
|
||||
|
||||
@Override
|
||||
protected AbstractMessageChannel createChannelInternal(DispatcherPolicy dispatcherPolicy) {
|
||||
return new StubChannel(dispatcherPolicy);
|
||||
protected AbstractMessageChannel createChannelInternal() {
|
||||
return new StubChannel();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -11,10 +11,6 @@
|
||||
|
||||
<beans:bean id="factory" class="org.springframework.integration.channel.factory.StubChannelFactory"/>
|
||||
|
||||
<beans:bean id="testChannel" class="org.springframework.integration.bus.DefaultChannelFactoryBean">
|
||||
<beans:constructor-arg ref="dispatcherPolicy"/>
|
||||
</beans:bean>
|
||||
|
||||
<beans:bean id="dispatcherPolicy" class="org.springframework.integration.channel.DispatcherPolicy"/>
|
||||
<beans:bean id="testChannel" class="org.springframework.integration.bus.DefaultChannelFactoryBean"/>
|
||||
|
||||
</beans:beans>
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
/*
|
||||
* Copyright 2002-2008 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.dispatcher;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.endpoint.HandlerEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.handler.TestHandlers;
|
||||
import org.springframework.integration.message.MessageTarget;
|
||||
import org.springframework.integration.message.StringMessage;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class BroadcastingDispatcherTests {
|
||||
|
||||
@Test
|
||||
public void testPublishSubscribe() throws InterruptedException {
|
||||
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals(1, counter1.get());
|
||||
assertEquals(1, counter2.get());
|
||||
}
|
||||
|
||||
|
||||
private static MessageTarget createEndpoint(MessageHandler handler) {
|
||||
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
|
||||
endpoint.start();
|
||||
return endpoint;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.springframework.integration.channel.DispatcherPolicy;
|
||||
import org.springframework.integration.endpoint.HandlerEndpoint;
|
||||
import org.springframework.integration.handler.MessageHandler;
|
||||
import org.springframework.integration.handler.TestHandlers;
|
||||
@@ -38,7 +37,7 @@ public class SimpleDispatcherTests {
|
||||
|
||||
@Test
|
||||
public void testSingleMessage() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy());
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countDownHandler(latch)));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
@@ -48,7 +47,7 @@ public class SimpleDispatcherTests {
|
||||
|
||||
@Test
|
||||
public void testPointToPoint() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy(false));
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
@@ -60,21 +59,6 @@ public class SimpleDispatcherTests {
|
||||
assertEquals("only 1 handler should have received the message", 1, counter1.get() + counter2.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishSubscribe() throws InterruptedException {
|
||||
SimpleDispatcher dispatcher = new SimpleDispatcher(new DispatcherPolicy(true));
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
final AtomicInteger counter1 = new AtomicInteger();
|
||||
final AtomicInteger counter2 = new AtomicInteger();
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter1, latch)));
|
||||
dispatcher.addTarget(createEndpoint(TestHandlers.countingCountDownHandler(counter2, latch)));
|
||||
dispatcher.send(new StringMessage("test"));
|
||||
latch.await(500, TimeUnit.MILLISECONDS);
|
||||
assertEquals(0, latch.getCount());
|
||||
assertEquals(1, counter1.get());
|
||||
assertEquals(1, counter2.get());
|
||||
}
|
||||
|
||||
|
||||
private static MessageTarget createEndpoint(MessageHandler handler) {
|
||||
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
|
||||
|
||||
@@ -18,16 +18,16 @@
|
||||
<si:channel id="replyChannel"/>
|
||||
<si:channel id="customErrorChannel"/>
|
||||
|
||||
<si:handler-endpoint input-channel="channel1WithOverride" ref="testBean" method="duplicate"
|
||||
<si:service-activator input-channel="channel1WithOverride" ref="testBean" method="duplicate"
|
||||
output-channel="channel2" return-address-overrides="true"/>
|
||||
|
||||
<si:handler-endpoint input-channel="channel3WithOverride" ref="testBean" method="duplicate"
|
||||
<si:service-activator input-channel="channel3WithOverride" ref="testBean" method="duplicate"
|
||||
return-address-overrides="true"/>
|
||||
|
||||
<si:handler-endpoint input-channel="channel1" ref="testBean" method="duplicate" output-channel="channel2"/>
|
||||
<si:handler-endpoint input-channel="channel2" ref="testBean" method="duplicate" output-channel="channel3"/>
|
||||
<si:handler-endpoint input-channel="channel3" ref="testBean" method="duplicate"/>
|
||||
<si:handler-endpoint input-channel="channel4" ref="testBean" method="duplicate" output-channel="replyChannel"/>
|
||||
<si:service-activator input-channel="channel1" ref="testBean" method="duplicate" output-channel="channel2"/>
|
||||
<si:service-activator input-channel="channel2" ref="testBean" method="duplicate" output-channel="channel3"/>
|
||||
<si:service-activator input-channel="channel3" ref="testBean" method="duplicate"/>
|
||||
<si:service-activator input-channel="channel4" ref="testBean" method="duplicate" output-channel="replyChannel"/>
|
||||
|
||||
<bean id="testBean" class="org.springframework.integration.endpoint.TestBean"/>
|
||||
|
||||
|
||||
Reference in New Issue
Block a user