diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml
index 1165fe9c43..7ef3e48122 100644
--- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml
+++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml
@@ -7,7 +7,6 @@
http://www.springframework.org/schema/integration-xml http://www.springframework.org/schema/integration/spring-integration-xml-1.0.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
-
@@ -23,11 +22,11 @@
-
+
-
+
getBeanClass(Element element) {
+ return this.getEndpointClass();
+ }
+
+ @Override
+ protected boolean shouldGenerateId() {
+ return false;
+ }
+
+ @Override
+ protected boolean shouldGenerateIdAsFallback() {
+ return true;
+ }
+
+ @Override
+ protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
+ String ref = element.getAttribute(REF_ATTRIBUTE);
+ if (!StringUtils.hasText(ref)) {
+ throw new ConfigurationException("The '" + REF_ATTRIBUTE + "' attribute is required.");
+ }
+ if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) {
+ String method = element.getAttribute(METHOD_ATTRIBUTE);
+ String adapterBeanName = this.parseAdapter(ref, method, element, parserContext);
+ builder.addConstructorArgReference(adapterBeanName);
+ }
+ else {
+ builder.addConstructorArgReference(ref);
+ }
+ String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE);
+ if (!StringUtils.hasText(inputChannel)) {
+ throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required");
+ }
+ Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT);
+ if (pollerElement != null) {
+ String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext);
+ builder.addPropertyReference("source", pollerBeanName);
+ }
+ else {
+ builder.addPropertyValue("inputChannelName", inputChannel);
+ }
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(
+ builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target");
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE);
+ }
+
+ private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) {
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(this.getMethodInvokingAdapterClass());
+ builder.addConstructorArgReference(ref);
+ builder.addConstructorArgValue(method);
+ String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName(builder.getBeanDefinition(), parserContext.getRegistry());
+ BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName);
+ parserContext.registerBeanComponent(new BeanComponentDefinition(holder));
+ return adapterBeanName;
+ }
+
+ protected abstract Class extends MessageEndpoint> getEndpointClass();
+
+ protected abstract Class> getMethodInvokingAdapterClass();
+
+}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java
index 1aaac09eb0..0ef4e0dbb9 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java
@@ -16,19 +16,25 @@
package org.springframework.integration.config;
-import org.springframework.integration.handler.MessageHandler;
-import org.springframework.integration.router.RouterMessageHandler;
+import org.springframework.integration.endpoint.MessageEndpoint;
+import org.springframework.integration.router.MethodInvokingRouter;
+import org.springframework.integration.router.RouterEndpoint;
/**
* Parser for the <router/> element.
*
* @author Mark Fisher
*/
-public class RouterParser extends AbstractMessageEndpointParser {
+public class RouterParser extends AbstractEndpointParser {
@Override
- protected Class extends MessageHandler> getHandlerAdapterClass() {
- return RouterMessageHandler.class;
+ protected Class extends MessageEndpoint> getEndpointClass() {
+ return RouterEndpoint.class;
+ }
+
+ @Override
+ protected Class> getMethodInvokingAdapterClass() {
+ return MethodInvokingRouter.class;
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java
index f61ff6bd32..34d77252f7 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java
@@ -16,94 +16,25 @@
package org.springframework.integration.config;
-import org.w3c.dom.Element;
-
-import org.springframework.beans.factory.config.BeanDefinitionHolder;
-import org.springframework.beans.factory.parsing.BeanComponentDefinition;
-import org.springframework.beans.factory.support.BeanDefinitionBuilder;
-import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
-import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser;
-import org.springframework.beans.factory.xml.ParserContext;
-import org.springframework.integration.ConfigurationException;
+import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.splitter.MethodInvokingSplitter;
import org.springframework.integration.splitter.SplitterEndpoint;
-import org.springframework.util.StringUtils;
-import org.springframework.util.xml.DomUtils;
/**
* Parser for the <splitter/> element.
*
* @author Mark Fisher
*/
-public class SplitterParser extends AbstractSingleBeanDefinitionParser {
-
- protected static final String REF_ATTRIBUTE = "ref";
-
- protected static final String METHOD_ATTRIBUTE = "method";
-
- protected static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel";
-
- protected static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel";
-
- private static final String POLLER_ELEMENT = "poller";
-
- private static final String ERROR_HANDLER_ATTRIBUTE = "error-handler";
-
+public class SplitterParser extends AbstractEndpointParser {
@Override
- protected Class> getBeanClass(Element element) {
+ protected Class extends MessageEndpoint> getEndpointClass() {
return SplitterEndpoint.class;
}
@Override
- protected boolean shouldGenerateId() {
- return false;
- }
-
- @Override
- protected boolean shouldGenerateIdAsFallback() {
- return true;
- }
-
- @Override
- protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
- String ref = element.getAttribute(REF_ATTRIBUTE);
- if (!StringUtils.hasText(ref)) {
- throw new ConfigurationException("The '" + REF_ATTRIBUTE + "' attribute is required.");
- }
- if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) {
- String method = element.getAttribute(METHOD_ATTRIBUTE);
- String adapterBeanName = this.parseAdapter(ref, method, element, parserContext);
- builder.addConstructorArgReference(adapterBeanName);
- }
- else {
- builder.addConstructorArgReference(ref);
- }
- String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE);
- if (!StringUtils.hasText(inputChannel)) {
- throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required");
- }
- Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT);
- if (pollerElement != null) {
- String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext);
- builder.addPropertyReference("source", pollerBeanName);
- }
- else {
- builder.addPropertyValue("inputChannelName", inputChannel);
- }
- IntegrationNamespaceUtils.setReferenceIfAttributeDefined(
- builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target");
- IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE);
- }
-
- private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) {
- BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingSplitter.class);
- builder.addConstructorArgReference(ref);
- builder.addConstructorArgValue(method);
- String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName(builder.getBeanDefinition(), parserContext.getRegistry());
- BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName);
- parserContext.registerBeanComponent(new BeanComponentDefinition(holder));
- return adapterBeanName;
+ protected Class> getMethodInvokingAdapterClass() {
+ return MethodInvokingSplitter.class;
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java
new file mode 100644
index 0000000000..ca1ddd1370
--- /dev/null
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.router;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.integration.channel.ChannelRegistry;
+import org.springframework.integration.channel.ChannelRegistryAware;
+import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageDeliveryException;
+import org.springframework.integration.message.MessageExchangeTemplate;
+import org.springframework.integration.message.MessageTarget;
+import org.springframework.integration.message.MessagingException;
+
+/**
+ * Base class for message router implementations.
+ *
+ * @author Mark Fisher
+ */
+public abstract class AbstractRouter implements Router, ChannelRegistryAware {
+
+ protected final Log logger = LogFactory.getLog(this.getClass());
+
+ private volatile ChannelRegistry channelRegistry;
+
+ private final MessageExchangeTemplate messageExchangeTemplate = new MessageExchangeTemplate();
+
+
+ public void setChannelRegistry(ChannelRegistry channelRegistry) {
+ this.channelRegistry = channelRegistry;
+ }
+
+ protected ChannelRegistry getChannelRegistry() {
+ return this.channelRegistry;
+ }
+
+ public final boolean route(Message> message) {
+ Collection> results = this.resolveChannels(message);
+ if (results == null || results.isEmpty()) {
+ return false;
+ }
+ boolean sent = false;
+ for (Object channelOrName : results) {
+ MessageTarget target = null;
+ if (channelOrName == null) {
+ continue;
+ }
+ if (channelOrName instanceof MessageTarget) {
+ target = (MessageTarget) channelOrName;
+ }
+ else if (channelOrName instanceof String) {
+ if (this.channelRegistry == null) {
+ throw new MessagingException(message, "router has no ChannelRegistry");
+ }
+ target = this.channelRegistry.lookupChannel((String) channelOrName);
+ }
+ else {
+ throw new MessagingException(message, "unsupported return type for router [" + channelOrName.getClass() + "]");
+ }
+ if (target == null) {
+ throw new MessageDeliveryException(message, "unable to resolve channel for '" + channelOrName + "'");
+ }
+ this.messageExchangeTemplate.send(message, target);
+ sent = true;
+ }
+ return sent;
+ }
+
+ /**
+ * Subclasses must implement this method to return 0 or more MessageChannel
+ * instances or channel names to which the given Message should be routed.
+ */
+ protected abstract Collection> resolveChannels(Message> message);
+
+}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java
deleted file mode 100644
index 78e945af20..0000000000
--- a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.router;
-
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.springframework.beans.factory.InitializingBean;
-import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.channel.ChannelRegistry;
-import org.springframework.integration.channel.ChannelRegistryAware;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.handler.MessageHandler;
-import org.springframework.integration.message.Message;
-import org.springframework.integration.message.MessageHandlingException;
-
-/**
- * Base class for message router implementations.
- *
- * @author Mark Fisher
- */
-public abstract class AbstractRoutingMessageHandler implements MessageHandler, ChannelRegistryAware, InitializingBean {
-
- protected Log logger = LogFactory.getLog(this.getClass());
-
- private ChannelRegistry channelRegistry;
-
- private boolean resolutionRequired = false;
-
- private long timeout = -1;
-
-
- /**
- * Set whether this router should always be required to resolve at least one
- * channel. The default is 'false'. To trigger an exception whenever the
- * resolver returns null or an empty channel list, set this value to 'true'.
- */
- public void setResolutionRequired(boolean resolutionRequired) {
- this.resolutionRequired = resolutionRequired;
- }
-
- /**
- * Set the timeout for sending a message to the resolved channel. By
- * default, there is no timeout, meaning the send will block indefinitely.
- */
- public void setTimeout(long timeout) {
- this.timeout = timeout;
- }
-
- public void setChannelRegistry(ChannelRegistry channelRegistry) {
- this.channelRegistry = channelRegistry;
- }
-
- protected ChannelRegistry getChannelRegistry() {
- return this.channelRegistry;
- }
-
- public final void afterPropertiesSet() {
- this.validate();
- }
-
- public final Message> handle(Message> message) {
- List channels = this.resolveChannels(message);
- if (channels == null || channels.size() == 0) {
- String description = "failed to resolve any channel for message";
- if (this.resolutionRequired) {
- throw new MessageHandlingException(message, description);
- }
- if (logger.isWarnEnabled()) {
- logger.warn(description);
- }
- return null;
- }
- for (MessageChannel channel : channels) {
- this.sendMesage(message, channel);
- }
- return null;
- }
-
- private void sendMesage(Message> message, MessageChannel channel) {
- boolean sent = false;
- if (timeout < 0) {
- sent = channel.send(message);
- }
- else {
- sent = channel.send(message, timeout);
- }
- if (!sent) {
- throw new MessageHandlingException(message,
- "failed to send message to channel '" + channel.getName() + "'");
- }
- }
-
- protected abstract void validate() throws ConfigurationException;
-
- protected abstract List resolveChannels(Message> message);
-
-}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java
new file mode 100644
index 0000000000..2831a8111f
--- /dev/null
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.router;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.integration.ConfigurationException;
+import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageMappingMethodInvoker;
+
+/**
+ * A {@link Router} implementation that invokes the specified method
+ * on the given object. The method's return value may be a single
+ * MessageChannel instance, a single String to be interpreted as
+ * a channel name, or a Collection (or Array) of either type.
+ *
+ * @author Mark Fisher
+ */
+public class MethodInvokingRouter extends AbstractRouter implements InitializingBean {
+
+ private final MessageMappingMethodInvoker invoker;
+
+
+ public MethodInvokingRouter(Object object, Method method) {
+ this.invoker = new MessageMappingMethodInvoker(object, method);
+ }
+
+ public MethodInvokingRouter(Object object, String methodName) {
+ this.invoker = new MessageMappingMethodInvoker(object, methodName);
+ }
+
+
+ public void afterPropertiesSet() throws Exception {
+ this.invoker.afterPropertiesSet();
+ }
+
+ @Override
+ protected Collection> resolveChannels(Message> message) {
+ Object result = this.invoker.invokeMethod(message);
+ if (result == null) {
+ return null;
+ }
+ List