IN PROGRESS - issue INT-567: Add round-robin dispatching strategy

http://jira.springframework.org/browse/INT-567

- refactored AbstractDispatcher to use Queue instead of Set
- added AbstractSendOnceDispatcher (for DirectChannel parametrization)
- refactored DirectChannel to use LoadBalancing Dispatcher by default
- renamed SimpleDispatcher to FailOverDispatcher
This commit is contained in:
Iwein Fuld
2009-03-07 19:29:01 +00:00
parent dabb76ac26
commit 242d8291ec
10 changed files with 181 additions and 118 deletions

View File

@@ -16,9 +16,11 @@
package org.springframework.integration.channel.config;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -35,6 +37,8 @@ import org.springframework.integration.config.TestChannelInterceptor;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessagePriority;
import org.springframework.integration.dispatcher.FailOverDispatcher;
import org.springframework.integration.dispatcher.LoadBalancingDispatcher;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageDeliveryException;
@@ -43,18 +47,19 @@ import org.springframework.integration.util.ErrorHandlingTaskExecutor;
/**
* @author Mark Fisher
* @author Iwein Fuld
*/
public class ChannelParserTests {
@Test(expected=FatalBeanException.class)
@Test(expected = FatalBeanException.class)
public void testChannelWithoutId() {
new ClassPathXmlApplicationContext("channelWithoutId.xml", this.getClass());
}
@Test
public void testChannelWithCapacity() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("capacityChannel");
for (int i = 0; i < 10; i++) {
boolean result = channel.send(new GenericMessage<String>("test"), 10);
@@ -65,31 +70,43 @@ public class ChannelParserTests {
@Test
public void testDirectChannelByDefault() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("defaultChannel");
assertEquals(DirectChannel.class, channel.getClass());
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
assertThat(accessor.getPropertyValue("dispatcher"), is(LoadBalancingDispatcher.class));
}
@Test
public void channelWithRoundRobinDispatcher() throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("failOverChannel");
assertEquals(DirectChannel.class, channel.getClass());
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
assertThat(accessor.getPropertyValue("dispatcher"), is(FailOverDispatcher.class));
}
@Test
public void testPublishSubscribeChannel() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("publishSubscribeChannel");
assertEquals(PublishSubscribeChannel.class, channel.getClass());
}
@Test
public void testPublishSubscribeChannelWithTaskExecutorReference() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("publishSubscribeChannelWithTaskExecutorRef");
assertEquals(PublishSubscribeChannel.class, channel.getClass());
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
accessor = new DirectFieldAccessor(accessor.getPropertyValue("dispatcher"));
Object taskExecutorProperty = accessor.getPropertyValue("taskExecutor");
assertNotNull(taskExecutorProperty);
assertEquals(ErrorHandlingTaskExecutor.class, taskExecutorProperty.getClass());
assertEquals(ErrorHandlingTaskExecutor.class, taskExecutorProperty.getClass());
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(taskExecutorProperty);
TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor");
Object taskExecutorBean = context.getBean("taskExecutor");
@@ -98,24 +115,24 @@ public class ChannelParserTests {
@Test
public void testDatatypeChannelWithCorrectType() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("integerChannel");
assertTrue(channel.send(new GenericMessage<Integer>(123)));
}
@Test(expected=MessageDeliveryException.class)
@Test(expected = MessageDeliveryException.class)
public void testDatatypeChannelWithIncorrectType() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("integerChannel");
channel.send(new StringMessage("incorrect type"));
}
@Test
public void testDatatypeChannelWithAssignableSubTypes() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("numberChannel");
assertTrue(channel.send(new GenericMessage<Integer>(123)));
assertTrue(channel.send(new GenericMessage<Double>(123.45)));
@@ -123,25 +140,25 @@ public class ChannelParserTests {
@Test
public void testMultipleDatatypeChannelWithCorrectTypes() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("stringOrNumberChannel");
assertTrue(channel.send(new GenericMessage<Integer>(123)));
assertTrue(channel.send(new StringMessage("accepted type")));
}
@Test(expected=MessageDeliveryException.class)
@Test(expected = MessageDeliveryException.class)
public void testMultipleDatatypeChannelWithIncorrectType() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"channelParserTests.xml", this.getClass());
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this
.getClass());
MessageChannel channel = (MessageChannel) context.getBean("stringOrNumberChannel");
channel.send(new GenericMessage<Boolean>(true));
}
@Test
public void testChannelInteceptorRef() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"channelInterceptorParserTests.xml", this.getClass());
ApplicationContext context = new ClassPathXmlApplicationContext("channelInterceptorParserTests.xml", this
.getClass());
PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorRef");
TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor");
assertEquals(0, interceptor.getSendCount());
@@ -154,8 +171,8 @@ public class ChannelParserTests {
@Test
public void testChannelInteceptorInnerBean() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"channelInterceptorParserTests.xml", this.getClass());
ApplicationContext context = new ClassPathXmlApplicationContext("channelInterceptorParserTests.xml", this
.getClass());
PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorInnerBean");
channel.send(new StringMessage("test"));
Message<?> transformed = channel.receive(1000);
@@ -164,15 +181,14 @@ public class ChannelParserTests {
@Test
public void testPriorityChannelWithDefaultComparator() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this
.getClass());
PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithDefaultComparator");
Message<String> lowPriorityMessage = MessageBuilder.withPayload("low")
.setPriority(MessagePriority.LOW).build();
Message<String> midPriorityMessage = MessageBuilder.withPayload("mid")
.setPriority(MessagePriority.NORMAL).build();
Message<String> highPriorityMessage = MessageBuilder.withPayload("high")
.setPriority(MessagePriority.HIGH).build();
Message<String> lowPriorityMessage = MessageBuilder.withPayload("low").setPriority(MessagePriority.LOW).build();
Message<String> midPriorityMessage = MessageBuilder.withPayload("mid").setPriority(MessagePriority.NORMAL)
.build();
Message<String> highPriorityMessage = MessageBuilder.withPayload("high").setPriority(MessagePriority.HIGH)
.build();
channel.send(lowPriorityMessage);
channel.send(highPriorityMessage);
channel.send(midPriorityMessage);
@@ -186,8 +202,8 @@ public class ChannelParserTests {
@Test
public void testPriorityChannelWithCustomComparator() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this
.getClass());
PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithCustomComparator");
channel.send(new StringMessage("C"));
channel.send(new StringMessage("A"));
@@ -205,8 +221,8 @@ public class ChannelParserTests {
@Test
public void testPriorityChannelWithIntegerDatatypeEnforced() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this
.getClass());
PollableChannel channel = (PollableChannel) context.getBean("integerOnlyPriorityChannel");
channel.send(new GenericMessage<Integer>(3));
channel.send(new GenericMessage<Integer>(2));

View File

@@ -1,35 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<channel id="capacityChannel">
<queue capacity="10"/>
<queue capacity="10" />
</channel>
<channel id="defaultChannel"/>
<publish-subscribe-channel id="publishSubscribeChannel"/>
<channel id="defaultChannel" />
<channel id="failOverChannel" dispatcher="fail-over"/>
<publish-subscribe-channel id="publishSubscribeChannel" />
<publish-subscribe-channel id="publishSubscribeChannelWithTaskExecutorRef"
task-executor="taskExecutor"/>
task-executor="taskExecutor" />
<channel id="integerChannel" datatype="java.lang.Integer">
<queue capacity="10"/>
<queue capacity="10" />
</channel>
<channel id="numberChannel" datatype="java.lang.Number">
<queue capacity="10"/>
<queue capacity="10" />
</channel>
<channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number">
<queue capacity="10"/>
<queue capacity="10" />
</channel>
<beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"/>
<beans:bean id="taskExecutor"
class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
</beans:beans>

View File

@@ -37,11 +37,11 @@ import org.springframework.integration.message.TestHandlers;
/**
* @author Mark Fisher
*/
public class SimpleDispatcherTests {
public class FailOverDispatcherTests {
@Test
public void singleMessage() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
dispatcher.addHandler(createConsumer(TestHandlers.countDownHandler(latch)));
dispatcher.dispatch(new StringMessage("test"));
@@ -51,7 +51,7 @@ public class SimpleDispatcherTests {
@Test
public void pointToPoint() throws InterruptedException {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger counter1 = new AtomicInteger();
final AtomicInteger counter2 = new AtomicInteger();
@@ -65,7 +65,7 @@ public class SimpleDispatcherTests {
@Test
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target);
@@ -81,7 +81,7 @@ public class SimpleDispatcherTests {
@Test
public void removeConsumerBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
@@ -101,7 +101,7 @@ public class SimpleDispatcherTests {
@Test
public void removeConsumerBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
@@ -136,7 +136,7 @@ public class SimpleDispatcherTests {
@Test(expected = MessageDeliveryException.class)
public void removeConsumerLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target = new CountingTestEndpoint(counter, false);
dispatcher.addHandler(target);
@@ -153,7 +153,7 @@ public class SimpleDispatcherTests {
@Test
public void firstHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target1 = new CountingTestEndpoint(counter, true);
MessageHandler target2 = new CountingTestEndpoint(counter, false);
@@ -167,7 +167,7 @@ public class SimpleDispatcherTests {
@Test
public void middleHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, true);
@@ -181,7 +181,7 @@ public class SimpleDispatcherTests {
@Test
public void allHandlersReturnFalse() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
FailOverDispatcher dispatcher = new FailOverDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageHandler target1 = new CountingTestEndpoint(counter, false);
MessageHandler target2 = new CountingTestEndpoint(counter, false);