diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java index 00a56990c0..2574bb938d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -16,7 +16,8 @@ package org.springframework.integration.channel; -import org.springframework.integration.dispatcher.SimpleDispatcher; +import org.springframework.integration.dispatcher.AbstractSendOnceDispatcher; +import org.springframework.integration.dispatcher.LoadBalancingDispatcher; /** * A channel that invokes a single subscriber for each sent Message. @@ -24,11 +25,15 @@ import org.springframework.integration.dispatcher.SimpleDispatcher; * * @author Dave Syer * @author Mark Fisher + * @author Iwein Fuld */ -public class DirectChannel extends AbstractSubscribableChannel { +public class DirectChannel extends AbstractSubscribableChannel { public DirectChannel() { - super(new SimpleDispatcher()); + super(new LoadBalancingDispatcher()); + } + public DirectChannel(AbstractSendOnceDispatcher dispatcher){ + super(dispatcher); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java index 6a664b203c..9d151dbb60 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java @@ -18,7 +18,9 @@ package org.springframework.integration.config.xml; import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; @@ -33,6 +35,7 @@ public class PointToPointChannelParser extends AbstractChannelParser { private static final String CHANNEL_PACKAGE = IntegrationNamespaceUtils.BASE_PACKAGE + ".channel"; + private static final String DISPATCHER_PACKAGE = IntegrationNamespaceUtils.BASE_PACKAGE + ".dispatcher"; @Override protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) { @@ -55,10 +58,24 @@ public class PointToPointChannelParser extends AbstractChannelParser { } else { builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".DirectChannel"); + parseDispatcher(element.getAttribute("dispatcher"), builder, parserContext); } return builder; } + private void parseDispatcher(String dispatcherAttribute, BeanDefinitionBuilder builder, ParserContext parserContext) { + if (dispatcherAttribute != null) { + if (dispatcherAttribute.equals("fail-over")) { + BeanDefinitionBuilder dispatcherBuilder = BeanDefinitionBuilder + .genericBeanDefinition(DISPATCHER_PACKAGE + ".FailOverDispatcher"); + dispatcherBuilder.setRole(BeanDefinition.ROLE_SUPPORT); + builder.addConstructorArgReference(BeanDefinitionReaderUtils.registerWithGeneratedName(dispatcherBuilder + .getBeanDefinition(), parserContext.getRegistry())); + } + } + // rely on the default for round-robin + } + private void parseQueueCapacity(BeanDefinitionBuilder builder, Element queueElement) { String capacity = queueElement.getAttribute("capacity"); if (StringUtils.hasText(capacity)) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index 7e7afa2722..123c48de91 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -59,6 +59,7 @@ + @@ -163,6 +164,14 @@ + + + + + + + + @@ -789,7 +798,8 @@ - + @@ -823,7 +833,7 @@ - + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java index 8095abe59b..3facdfaeee 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractDispatcher.java @@ -16,14 +16,11 @@ package org.springframework.integration.dispatcher; -import java.util.Collection; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.springframework.core.task.TaskExecutor; import org.springframework.integration.core.Message; import org.springframework.integration.message.MessageHandler; @@ -40,22 +37,14 @@ public abstract class AbstractDispatcher implements MessageDispatcher { protected final Log logger = LogFactory.getLog(this.getClass()); - private final Set handlers = new CopyOnWriteArraySet(); + private final Queue handlers = new ConcurrentLinkedQueue(); private volatile TaskExecutor taskExecutor; - public boolean addHandler(MessageHandler handler) { - return this.handlers.add(handler); - } - - public boolean removeHandler(MessageHandler handler) { - return this.handlers.remove(handler); - } - /** - * Specify a {@link TaskExecutor} for invoking the handlers. - * If none is provided, the invocation will occur in the thread - * that runs this polling dispatcher. + * Specify a {@link TaskExecutor} for invoking the handlers. If none is + * provided, the invocation will occur in the thread that runs this polling + * dispatcher. */ public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; @@ -65,10 +54,21 @@ public abstract class AbstractDispatcher implements MessageDispatcher { return this.taskExecutor; } - protected Set getHandlers() { - return Collections.unmodifiableSet(handlers); + protected Queue getHandlers() { + return handlers; } - + + public boolean addHandler(MessageHandler handler) { + if (this.handlers.contains(handler)) { + return false; + } + return this.handlers.offer(handler); + } + + public boolean removeHandler(MessageHandler handler) { + return this.handlers.remove(handler); + } + public String toString() { return this.getClass().getSimpleName() + " with handlers: " + this.handlers; } @@ -78,7 +78,7 @@ public abstract class AbstractDispatcher implements MessageDispatcher { * "Selective Consumer" throws a {@link MessageRejectedException}. */ protected boolean sendMessageToHandler(Message message, MessageHandler handler) { - Assert.notNull(message, "'message' must not be null"); + Assert.notNull(message, "'message' must not be null."); Assert.notNull(handler, "'handler' must not be null."); try { handler.handleMessage(message); @@ -86,7 +86,12 @@ public abstract class AbstractDispatcher implements MessageDispatcher { } catch (MessageRejectedException e) { if (logger.isDebugEnabled()) { - logger.debug("Handler '" + handler + "' rejected Message, if other handlers are available this dispatcher may try to send to those.", e); + logger + .debug( + "Handler '" + + handler + + "' rejected Message, if other handlers are available this dispatcher may try to send to those.", + e); } return false; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractSendOnceDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractSendOnceDispatcher.java new file mode 100644 index 0000000000..5e12533e77 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/AbstractSendOnceDispatcher.java @@ -0,0 +1,9 @@ +package org.springframework.integration.dispatcher; + +import org.springframework.integration.core.Message; + +public abstract class AbstractSendOnceDispatcher extends AbstractDispatcher { + + public abstract boolean dispatch(Message message); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java similarity index 68% rename from org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java rename to org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java index 57ee896762..d248b8500a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/FailOverDispatcher.java @@ -16,42 +16,42 @@ package org.springframework.integration.dispatcher; +import java.util.Iterator; + import org.springframework.integration.core.Message; -import org.springframework.integration.message.MessageHandler; import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.message.MessageHandler; import org.springframework.integration.message.MessageRejectedException; /** - * Basic implementation of {@link MessageDispatcher} that will attempt - * to send a {@link Message} to one of its handlers. As soon as one - * of the handlers accepts the Message, the dispatcher will return 'true'. + * Basic implementation of {@link MessageDispatcher} that will attempt to send a + * {@link Message} to one of its handlers. As soon as one of the + * handlers accepts the Message, the dispatcher will return 'true'. *

- * If the dispatcher has no handlers, a {@link MessageDeliveryException} - * will be thrown. If all handlers reject the Message, the dispatcher will - * throw a MessageRejectedException. + * If the dispatcher has no handlers, a {@link MessageDeliveryException} will be + * thrown. If all handlers reject the Message, the dispatcher will throw a + * MessageRejectedException. * * @author Mark Fisher * @author Iwein Fuld */ -public class SimpleDispatcher extends AbstractDispatcher { +public class FailOverDispatcher extends AbstractSendOnceDispatcher { public boolean dispatch(Message message) { if (this.getHandlers().size() == 0) { throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } - int count = 0; - int rejectedExceptionCount = 0; - for (MessageHandler handler : this.getHandlers()) { - count++; - if (this.sendMessageToHandler(message, handler)) { - return true; + Iterator handlerIterator = this.getHandlers().iterator(); + boolean sent = false; + while (sent == false && handlerIterator.hasNext()) { + if (this.sendMessageToHandler(message, handlerIterator.next())) { + sent = true; } - rejectedExceptionCount++; } - if (rejectedExceptionCount == count) { + if (!sent) { throw new MessageRejectedException(message, "All of dispatcher's subscribers rejected Message."); } - return false; + return sent; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/LoadBalancingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/LoadBalancingDispatcher.java index f3b980100b..bf4918eec7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/LoadBalancingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/LoadBalancingDispatcher.java @@ -15,11 +15,8 @@ */ package org.springframework.integration.dispatcher; -import java.util.HashSet; -import java.util.Iterator; import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; import org.springframework.integration.core.Message; import org.springframework.integration.message.MessageDeliveryException; @@ -39,21 +36,24 @@ import org.springframework.integration.message.MessageRejectedException; * * @author Iwein Fuld */ -public class LoadBalancingDispatcher extends AbstractDispatcher { +public class LoadBalancingDispatcher extends AbstractSendOnceDispatcher { - private final Queue handlerQueue = new ConcurrentLinkedQueue(); + private ReentrantLock queueLock = new ReentrantLock(); public boolean dispatch(Message message) { - Set handlers = new HashSet(this.getHandlers()); + queueLock.lock(); + Queue handlers = this.getHandlers(); if (handlers.isEmpty()) { throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } - if (this.handlerQueue.isEmpty()){ - handlerQueue.addAll(handlers); - } boolean success = false; - while (!handlerQueue.isEmpty() && success == false) { - MessageHandler handler = handlerQueue.poll(); + int size = handlers.size(); + queueLock.unlock(); + for (int i = 0; i < size && success == false; i++) { + queueLock.lock(); + MessageHandler handler = handlers.poll(); + handlers.offer(handler); + queueLock.unlock(); if (this.sendMessageToHandler(message, handler)) { success = true; } @@ -63,5 +63,4 @@ public class LoadBalancingDispatcher extends AbstractDispatcher { } return success; } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 513c2476d4..3b34a12449 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -16,9 +16,11 @@ package org.springframework.integration.channel.config; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.junit.Test; @@ -35,6 +37,8 @@ import org.springframework.integration.config.TestChannelInterceptor; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessagePriority; +import org.springframework.integration.dispatcher.FailOverDispatcher; +import org.springframework.integration.dispatcher.LoadBalancingDispatcher; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageDeliveryException; @@ -43,18 +47,19 @@ import org.springframework.integration.util.ErrorHandlingTaskExecutor; /** * @author Mark Fisher + * @author Iwein Fuld */ public class ChannelParserTests { - @Test(expected=FatalBeanException.class) + @Test(expected = FatalBeanException.class) public void testChannelWithoutId() { new ClassPathXmlApplicationContext("channelWithoutId.xml", this.getClass()); } @Test public void testChannelWithCapacity() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("capacityChannel"); for (int i = 0; i < 10; i++) { boolean result = channel.send(new GenericMessage("test"), 10); @@ -65,31 +70,43 @@ public class ChannelParserTests { @Test public void testDirectChannelByDefault() throws InterruptedException { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("defaultChannel"); assertEquals(DirectChannel.class, channel.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(channel); + assertThat(accessor.getPropertyValue("dispatcher"), is(LoadBalancingDispatcher.class)); + } + + @Test + public void channelWithRoundRobinDispatcher() throws Exception { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); + MessageChannel channel = (MessageChannel) context.getBean("failOverChannel"); + assertEquals(DirectChannel.class, channel.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(channel); + assertThat(accessor.getPropertyValue("dispatcher"), is(FailOverDispatcher.class)); } @Test public void testPublishSubscribeChannel() throws InterruptedException { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("publishSubscribeChannel"); assertEquals(PublishSubscribeChannel.class, channel.getClass()); } @Test public void testPublishSubscribeChannelWithTaskExecutorReference() throws InterruptedException { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("publishSubscribeChannelWithTaskExecutorRef"); assertEquals(PublishSubscribeChannel.class, channel.getClass()); DirectFieldAccessor accessor = new DirectFieldAccessor(channel); accessor = new DirectFieldAccessor(accessor.getPropertyValue("dispatcher")); Object taskExecutorProperty = accessor.getPropertyValue("taskExecutor"); assertNotNull(taskExecutorProperty); - assertEquals(ErrorHandlingTaskExecutor.class, taskExecutorProperty.getClass()); + assertEquals(ErrorHandlingTaskExecutor.class, taskExecutorProperty.getClass()); DirectFieldAccessor executorAccessor = new DirectFieldAccessor(taskExecutorProperty); TaskExecutor innerExecutor = (TaskExecutor) executorAccessor.getPropertyValue("taskExecutor"); Object taskExecutorBean = context.getBean("taskExecutor"); @@ -98,24 +115,24 @@ public class ChannelParserTests { @Test public void testDatatypeChannelWithCorrectType() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("integerChannel"); assertTrue(channel.send(new GenericMessage(123))); } - @Test(expected=MessageDeliveryException.class) + @Test(expected = MessageDeliveryException.class) public void testDatatypeChannelWithIncorrectType() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("integerChannel"); channel.send(new StringMessage("incorrect type")); } @Test public void testDatatypeChannelWithAssignableSubTypes() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("numberChannel"); assertTrue(channel.send(new GenericMessage(123))); assertTrue(channel.send(new GenericMessage(123.45))); @@ -123,25 +140,25 @@ public class ChannelParserTests { @Test public void testMultipleDatatypeChannelWithCorrectTypes() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("stringOrNumberChannel"); assertTrue(channel.send(new GenericMessage(123))); assertTrue(channel.send(new StringMessage("accepted type"))); } - @Test(expected=MessageDeliveryException.class) + @Test(expected = MessageDeliveryException.class) public void testMultipleDatatypeChannelWithIncorrectType() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "channelParserTests.xml", this.getClass()); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("channelParserTests.xml", this + .getClass()); MessageChannel channel = (MessageChannel) context.getBean("stringOrNumberChannel"); channel.send(new GenericMessage(true)); } @Test public void testChannelInteceptorRef() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "channelInterceptorParserTests.xml", this.getClass()); + ApplicationContext context = new ClassPathXmlApplicationContext("channelInterceptorParserTests.xml", this + .getClass()); PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorRef"); TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor"); assertEquals(0, interceptor.getSendCount()); @@ -154,8 +171,8 @@ public class ChannelParserTests { @Test public void testChannelInteceptorInnerBean() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "channelInterceptorParserTests.xml", this.getClass()); + ApplicationContext context = new ClassPathXmlApplicationContext("channelInterceptorParserTests.xml", this + .getClass()); PollableChannel channel = (PollableChannel) context.getBean("channelWithInterceptorInnerBean"); channel.send(new StringMessage("test")); Message transformed = channel.receive(1000); @@ -164,15 +181,14 @@ public class ChannelParserTests { @Test public void testPriorityChannelWithDefaultComparator() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "priorityChannelParserTests.xml", this.getClass()); + ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this + .getClass()); PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithDefaultComparator"); - Message lowPriorityMessage = MessageBuilder.withPayload("low") - .setPriority(MessagePriority.LOW).build(); - Message midPriorityMessage = MessageBuilder.withPayload("mid") - .setPriority(MessagePriority.NORMAL).build(); - Message highPriorityMessage = MessageBuilder.withPayload("high") - .setPriority(MessagePriority.HIGH).build(); + Message lowPriorityMessage = MessageBuilder.withPayload("low").setPriority(MessagePriority.LOW).build(); + Message midPriorityMessage = MessageBuilder.withPayload("mid").setPriority(MessagePriority.NORMAL) + .build(); + Message highPriorityMessage = MessageBuilder.withPayload("high").setPriority(MessagePriority.HIGH) + .build(); channel.send(lowPriorityMessage); channel.send(highPriorityMessage); channel.send(midPriorityMessage); @@ -186,8 +202,8 @@ public class ChannelParserTests { @Test public void testPriorityChannelWithCustomComparator() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "priorityChannelParserTests.xml", this.getClass()); + ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this + .getClass()); PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithCustomComparator"); channel.send(new StringMessage("C")); channel.send(new StringMessage("A")); @@ -205,8 +221,8 @@ public class ChannelParserTests { @Test public void testPriorityChannelWithIntegerDatatypeEnforced() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "priorityChannelParserTests.xml", this.getClass()); + ApplicationContext context = new ClassPathXmlApplicationContext("priorityChannelParserTests.xml", this + .getClass()); PollableChannel channel = (PollableChannel) context.getBean("integerOnlyPriorityChannel"); channel.send(new GenericMessage(3)); channel.send(new GenericMessage(2)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml index 8c1640fe78..a9eaa7949a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/channelParserTests.xml @@ -1,35 +1,37 @@ - + http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - + - - - + + + + + + task-executor="taskExecutor" /> - + - + - + - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java similarity index 92% rename from org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java rename to org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java index 6f3567143d..ecc3ee4b70 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/FailOverDispatcherTests.java @@ -37,11 +37,11 @@ import org.springframework.integration.message.TestHandlers; /** * @author Mark Fisher */ -public class SimpleDispatcherTests { +public class FailOverDispatcherTests { @Test public void singleMessage() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final CountDownLatch latch = new CountDownLatch(1); dispatcher.addHandler(createConsumer(TestHandlers.countDownHandler(latch))); dispatcher.dispatch(new StringMessage("test")); @@ -51,7 +51,7 @@ public class SimpleDispatcherTests { @Test public void pointToPoint() throws InterruptedException { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final CountDownLatch latch = new CountDownLatch(1); final AtomicInteger counter1 = new AtomicInteger(); final AtomicInteger counter2 = new AtomicInteger(); @@ -65,7 +65,7 @@ public class SimpleDispatcherTests { @Test public void noDuplicateSubscriptions() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target = new CountingTestEndpoint(counter, false); dispatcher.addHandler(target); @@ -81,7 +81,7 @@ public class SimpleDispatcherTests { @Test public void removeConsumerBeforeSend() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target1 = new CountingTestEndpoint(counter, false); MessageHandler target2 = new CountingTestEndpoint(counter, false); @@ -101,7 +101,7 @@ public class SimpleDispatcherTests { @Test public void removeConsumerBetweenSends() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target1 = new CountingTestEndpoint(counter, false); MessageHandler target2 = new CountingTestEndpoint(counter, false); @@ -136,7 +136,7 @@ public class SimpleDispatcherTests { @Test(expected = MessageDeliveryException.class) public void removeConsumerLastTargetCausesDeliveryException() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target = new CountingTestEndpoint(counter, false); dispatcher.addHandler(target); @@ -153,7 +153,7 @@ public class SimpleDispatcherTests { @Test public void firstHandlerReturnsTrue() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target1 = new CountingTestEndpoint(counter, true); MessageHandler target2 = new CountingTestEndpoint(counter, false); @@ -167,7 +167,7 @@ public class SimpleDispatcherTests { @Test public void middleHandlerReturnsTrue() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target1 = new CountingTestEndpoint(counter, false); MessageHandler target2 = new CountingTestEndpoint(counter, true); @@ -181,7 +181,7 @@ public class SimpleDispatcherTests { @Test public void allHandlersReturnFalse() { - SimpleDispatcher dispatcher = new SimpleDispatcher(); + FailOverDispatcher dispatcher = new FailOverDispatcher(); final AtomicInteger counter = new AtomicInteger(); MessageHandler target1 = new CountingTestEndpoint(counter, false); MessageHandler target2 = new CountingTestEndpoint(counter, false);