diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java
index 7a53569f96..5d9efc2514 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PointToPointChannelParser.java
@@ -16,6 +16,8 @@
package org.springframework.integration.config.xml;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Element;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
@@ -37,6 +39,9 @@ public class PointToPointChannelParser extends AbstractChannelParser {
private static final String DISPATCHER_PACKAGE = IntegrationNamespaceUtils.BASE_PACKAGE + ".dispatcher";
+ private final Log logger = LogFactory.getLog(this.getClass());
+
+
@Override
protected BeanDefinitionBuilder buildBeanDefinition(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder = null;
@@ -64,16 +69,53 @@ public class PointToPointChannelParser extends AbstractChannelParser {
builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".RendezvousChannel");
}
- // verify that the 'task-executor' is not provided if a queue sub-element exists
- String taskExecutor = element.getAttribute("task-executor");
- if (queueElement != null && StringUtils.hasText(taskExecutor)) {
- parserContext.getReaderContext().error("The 'task-executor' attribute " +
+ Element dispatcherElement = DomUtils.getChildElementByTagName(element, "dispatcher");
+
+ // check for the dispatcher attribute (deprecated)
+ String dispatcherAttribute = element.getAttribute("dispatcher");
+ boolean hasDispatcherAttribute = StringUtils.hasText(dispatcherAttribute);
+ if (hasDispatcherAttribute && logger.isWarnEnabled()) {
+ logger.warn("The 'dispatcher' attribute on the 'channel' element is deprecated. " +
+ "Please use the 'dispatcher' sub-element instead.");
+ }
+
+ // verify that a dispatcher is not provided if a queue sub-element exists
+ if (queueElement != null && (dispatcherElement != null || hasDispatcherAttribute)) {
+ parserContext.getReaderContext().error("The 'dispatcher' attribute or sub-element " +
"and any queue sub-element are mutually exclusive.", element);
return null;
}
- if (builder == null) {
+ if (queueElement != null) {
+ return builder;
+ }
+
+ if (dispatcherElement != null && hasDispatcherAttribute) {
+ parserContext.getReaderContext().error("The 'dispatcher' attribute and 'dispatcher' " +
+ "sub-element are mutually exclusive. NOTE: the attribute is DEPRECATED. " +
+ "Please use the dispatcher sub-element instead.", element);
+ return null;
+ }
+
+ if (hasDispatcherAttribute) {
+ // this attribute is deprecated, but if set, we need to create a DirectChannel
+ // without any LoadBalancerStrategy and the failover flag set to true (default).
+ builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".DirectChannel");
+ if (!"failover".equals(dispatcherAttribute)) {
+ // round-robin dispatcher is used by default, the "failover" value simply disables it
+ builder.addConstructorArgValue(new RootBeanDefinition(
+ DISPATCHER_PACKAGE + ".RoundRobinLoadBalancingStrategy", null, null));
+ }
+ }
+ else if (dispatcherElement == null) {
+ // configure the default DirectChannel with a RoundRobinLoadBalancingStrategy
+ builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".DirectChannel");
+ builder.addConstructorArgValue(new RootBeanDefinition(
+ DISPATCHER_PACKAGE + ".RoundRobinLoadBalancingStrategy", null, null));
+ }
+ else {
// configure either an ExecutorChannel or DirectChannel based on existence of 'task-executor'
+ String taskExecutor = dispatcherElement.getAttribute("task-executor");
if (StringUtils.hasText(taskExecutor)) {
builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".ExecutorChannel");
builder.addConstructorArgReference(taskExecutor);
@@ -81,15 +123,15 @@ public class PointToPointChannelParser extends AbstractChannelParser {
else {
builder = BeanDefinitionBuilder.genericBeanDefinition(CHANNEL_PACKAGE + ".DirectChannel");
}
- // this attribute is deprecated, but if set, we need to create a UnicastingDispatcher
- // without any LoadBalancerStrategy and the failover flag set to true (default).
- String dispatcherAttribute = element.getAttribute("dispatcher");
- if (!"failover".equals(dispatcherAttribute)) {
- // round-robin dispatcher by default, but TODO first we need to check for the dispatcher element.
+ // unless the 'load-balancer' attribute is explicitly set to 'none',
+ // configure the default RoundRobinLoadBalancingStrategy
+ String loadBalancer = dispatcherElement.getAttribute("load-balancer");
+ if (!"none".equals(loadBalancer)) {
builder.addConstructorArgValue(new RootBeanDefinition(
DISPATCHER_PACKAGE + ".RoundRobinLoadBalancingStrategy", null, null));
}
- }
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, dispatcherElement, "failover");
+ }
return builder;
}
diff --git a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd
index 89c0fa8e4d..ccd6a5c825 100644
--- a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd
+++ b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd
@@ -64,25 +64,11 @@
+
-
-
-
-
-
-
-
-
-
-
@@ -133,6 +119,73 @@
+
+
+
+ Defines the dispatching configuration for a non-buffering channel
+ (i.e. one without a queue).
+
+
+
+
+
+
+ Defines a load-balancing strategy for the channel's dispatcher.
+ The default is a round-robin load balancer.
+
+
+
+
+
+
+ [DEFAULT] Defines a Round Robin dispatching strategy which allows for
+ load balancing of messages between multiple Message Handlers. Which message
+ handler receives the message first is determined by the 'order' attribute
+ of such Message Handler.
+
+
+
+
+
+
+ No LoadBalancingStrategy will be used.
+
+
+
+
+
+
+
+
+
+ Specifies whether this dispatcher has failover enabled. By default,
+ failover will be enabled. Set this to 'false' to disable it.
+ When enabled and message delivery to the primary Message Handler fails,
+ an attempt will be made to deliver the message to the next handler and so on...
+ Primary, secondary etc... is determined by the load-balancing strategy in use
+ (e.g. round-robin). If no load-balancer strategy is configured, the order will
+ be fixed in a sequence determined by the 'order' attribute on the Message Handlers
+ (or the @Ordered annotation on adapted methods).
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
@@ -215,31 +268,15 @@
- Defines a dispatching strategy for the channel.
- The default is a round
- robin load balancer. In case of a publish
- subscribe channel this
- attribute will be ignored.
+ This attribute is DEPRECATED. Please use the dispatcher sub-element instead.
- Defines a fail over dispatching strategy.
- If message delivery to the primary Message Handler failed an
- attempt will be made to deliver the message to the secondary and so on...
- Primary, secondary etc... is determined by the 'order' attribute on the Message Handlers.
-
-
-
-
-
-
- [DEFAULT] Defines a Round Robin dispatching strategy which allows for
- load balancing of messages between multiple Message Handlers. Which message
- handler receives the message first is determined by the 'order' attribute
- of such Message Handler.
+ Enables failover, but disables load-balancing.
+ See the dispatcher sub-element for more information.
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests-context.xml
new file mode 100644
index 0000000000..fc6ff39c62
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests-context.xml
@@ -0,0 +1,38 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java
new file mode 100644
index 0000000000..809deb09cc
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/DispatchingChannelParserTests.java
@@ -0,0 +1,132 @@
+/*
+ * Copyright 2002-2009 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.channel.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.springframework.beans.DirectFieldAccessor;
+import org.springframework.beans.FatalBeanException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.ExecutorChannel;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+/**
+ * @author Mark Fisher
+ * @since 1.0.3
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class DispatchingChannelParserTests {
+
+ @Autowired
+ private ApplicationContext context;
+
+ @Autowired
+ private Map channels;
+
+
+ @Test(expected = FatalBeanException.class)
+ public void dispatcherAttributeAndSubElement() {
+ new ClassPathXmlApplicationContext("dispatcherAttributeAndSubElement.xml", this.getClass());
+ }
+
+ @Test
+ public void dispatcherAttribute() {
+ MessageChannel channel = channels.get("dispatcherAttribute");
+ assertEquals(DirectChannel.class, channel.getClass());
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertNull(getDispatcherProperty("loadBalancingStrategy", channel));
+ }
+
+ @Test
+ public void taskExecutorOnly() {
+ MessageChannel channel = channels.get("taskExecutorOnly");
+ assertEquals(ExecutorChannel.class, channel.getClass());
+ assertSame(context.getBean("taskExecutor"), getDispatcherProperty("taskExecutor", channel));
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertEquals(RoundRobinLoadBalancingStrategy.class,
+ getDispatcherProperty("loadBalancingStrategy", channel).getClass());
+ }
+
+ @Test
+ public void failoverFalse() {
+ MessageChannel channel = channels.get("failoverFalse");
+ assertEquals(DirectChannel.class, channel.getClass());
+ assertFalse((Boolean) getDispatcherProperty("failover", channel));
+ assertEquals(RoundRobinLoadBalancingStrategy.class,
+ getDispatcherProperty("loadBalancingStrategy", channel).getClass());
+ }
+
+ @Test
+ public void failoverTrue() {
+ MessageChannel channel = channels.get("failoverTrue");
+ assertEquals(DirectChannel.class, channel.getClass());
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertEquals(RoundRobinLoadBalancingStrategy.class,
+ getDispatcherProperty("loadBalancingStrategy", channel).getClass());
+ }
+
+ @Test
+ public void loadBalancerDisabled() {
+ MessageChannel channel = channels.get("loadBalancerDisabled");
+ assertEquals(DirectChannel.class, channel.getClass());
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertNull(getDispatcherProperty("loadBalancingStrategy", channel));
+ }
+
+ @Test
+ public void loadBalancerDisabledAndTaskExecutor() {
+ MessageChannel channel = channels.get("loadBalancerDisabledAndTaskExecutor");
+ assertEquals(ExecutorChannel.class, channel.getClass());
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertNull(getDispatcherProperty("loadBalancingStrategy", channel));
+ assertSame(context.getBean("taskExecutor"), getDispatcherProperty("taskExecutor", channel));
+ }
+
+ @Test
+ public void roundRobinLoadBalancerAndTaskExecutor() {
+ MessageChannel channel = channels.get("roundRobinLoadBalancerAndTaskExecutor");
+ assertEquals(ExecutorChannel.class, channel.getClass());
+ assertTrue((Boolean) getDispatcherProperty("failover", channel));
+ assertEquals(RoundRobinLoadBalancingStrategy.class,
+ getDispatcherProperty("loadBalancingStrategy", channel).getClass());
+ assertSame(context.getBean("taskExecutor"), getDispatcherProperty("taskExecutor", channel));
+ }
+
+
+ private static Object getDispatcherProperty(String propertyName, MessageChannel channel) {
+ return new DirectFieldAccessor(
+ new DirectFieldAccessor(channel).getPropertyValue("dispatcher"))
+ .getPropertyValue(propertyName);
+ }
+
+}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/dispatcherAttributeAndSubElement.xml b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/dispatcherAttributeAndSubElement.xml
new file mode 100644
index 0000000000..42b6366132
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/dispatcherAttributeAndSubElement.xml
@@ -0,0 +1,13 @@
+
+
+
+
+
+
+
+