From ba9c086aeaba5096e1c4c8aabbe821eca481b291 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Thu, 28 Aug 2008 18:46:36 +0000 Subject: [PATCH] Added Router strategy interface, and refactored Message-routing support in general to avoid MessageHandler. --- .../config/XPathRouterParserTests-context.xml | 5 +- .../config/AbstractEndpointParser.java | 112 ++++++ .../integration/config/RouterParser.java | 16 +- .../integration/config/SplitterParser.java | 79 +--- .../integration/router/AbstractRouter.java | 92 +++++ .../router/AbstractRoutingMessageHandler.java | 114 ------ .../router/MethodInvokingRouter.java | 86 +++++ .../router/MultiChannelRouter.java | 44 +-- .../integration/router/Router.java | 30 ++ .../integration/router/RouterEndpoint.java | 107 ++++++ .../router/RouterMessageHandler.java | 81 +---- .../router/SingleChannelRouter.java | 43 +-- ...ts.java => MethodInvokingRouterTests.java} | 338 +++++++++--------- .../router/MultiChannelRouterTests.java | 94 +---- .../router/PayloadTypeRouterTests.java | 8 +- .../router/RecipientListRouterTests.java | 16 +- .../RootCauseErrorMessageRouterTests.java | 19 +- .../router/RouterEndpointTests.java | 198 ++++++++++ .../router/SingleChannelRouterTests.java | 83 +---- 19 files changed, 900 insertions(+), 665 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java delete mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/router/Router.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java rename org.springframework.integration/src/test/java/org/springframework/integration/router/{RouterMessageHandlerTests.java => MethodInvokingRouterTests.java} (59%) create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml index 1165fe9c43..7ef3e48122 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml @@ -7,7 +7,6 @@ http://www.springframework.org/schema/integration-xml http://www.springframework.org/schema/integration/spring-integration-xml-1.0.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - @@ -23,11 +22,11 @@ - + - + getBeanClass(Element element) { + return this.getEndpointClass(); + } + + @Override + protected boolean shouldGenerateId() { + return false; + } + + @Override + protected boolean shouldGenerateIdAsFallback() { + return true; + } + + @Override + protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + String ref = element.getAttribute(REF_ATTRIBUTE); + if (!StringUtils.hasText(ref)) { + throw new ConfigurationException("The '" + REF_ATTRIBUTE + "' attribute is required."); + } + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + String method = element.getAttribute(METHOD_ATTRIBUTE); + String adapterBeanName = this.parseAdapter(ref, method, element, parserContext); + builder.addConstructorArgReference(adapterBeanName); + } + else { + builder.addConstructorArgReference(ref); + } + String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE); + if (!StringUtils.hasText(inputChannel)) { + throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required"); + } + Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); + if (pollerElement != null) { + String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext); + builder.addPropertyReference("source", pollerBeanName); + } + else { + builder.addPropertyValue("inputChannelName", inputChannel); + } + IntegrationNamespaceUtils.setReferenceIfAttributeDefined( + builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); + } + + private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(this.getMethodInvokingAdapterClass()); + builder.addConstructorArgReference(ref); + builder.addConstructorArgValue(method); + String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName(builder.getBeanDefinition(), parserContext.getRegistry()); + BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName); + parserContext.registerBeanComponent(new BeanComponentDefinition(holder)); + return adapterBeanName; + } + + protected abstract Class getEndpointClass(); + + protected abstract Class getMethodInvokingAdapterClass(); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java index 1aaac09eb0..0ef4e0dbb9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java @@ -16,19 +16,25 @@ package org.springframework.integration.config; -import org.springframework.integration.handler.MessageHandler; -import org.springframework.integration.router.RouterMessageHandler; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.router.MethodInvokingRouter; +import org.springframework.integration.router.RouterEndpoint; /** * Parser for the <router/> element. * * @author Mark Fisher */ -public class RouterParser extends AbstractMessageEndpointParser { +public class RouterParser extends AbstractEndpointParser { @Override - protected Class getHandlerAdapterClass() { - return RouterMessageHandler.class; + protected Class getEndpointClass() { + return RouterEndpoint.class; + } + + @Override + protected Class getMethodInvokingAdapterClass() { + return MethodInvokingRouter.class; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java index f61ff6bd32..34d77252f7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java @@ -16,94 +16,25 @@ package org.springframework.integration.config; -import org.w3c.dom.Element; - -import org.springframework.beans.factory.config.BeanDefinitionHolder; -import org.springframework.beans.factory.parsing.BeanComponentDefinition; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; -import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.ConfigurationException; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.splitter.MethodInvokingSplitter; import org.springframework.integration.splitter.SplitterEndpoint; -import org.springframework.util.StringUtils; -import org.springframework.util.xml.DomUtils; /** * Parser for the <splitter/> element. * * @author Mark Fisher */ -public class SplitterParser extends AbstractSingleBeanDefinitionParser { - - protected static final String REF_ATTRIBUTE = "ref"; - - protected static final String METHOD_ATTRIBUTE = "method"; - - protected static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; - - protected static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; - - private static final String POLLER_ELEMENT = "poller"; - - private static final String ERROR_HANDLER_ATTRIBUTE = "error-handler"; - +public class SplitterParser extends AbstractEndpointParser { @Override - protected Class getBeanClass(Element element) { + protected Class getEndpointClass() { return SplitterEndpoint.class; } @Override - protected boolean shouldGenerateId() { - return false; - } - - @Override - protected boolean shouldGenerateIdAsFallback() { - return true; - } - - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - String ref = element.getAttribute(REF_ATTRIBUTE); - if (!StringUtils.hasText(ref)) { - throw new ConfigurationException("The '" + REF_ATTRIBUTE + "' attribute is required."); - } - if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { - String method = element.getAttribute(METHOD_ATTRIBUTE); - String adapterBeanName = this.parseAdapter(ref, method, element, parserContext); - builder.addConstructorArgReference(adapterBeanName); - } - else { - builder.addConstructorArgReference(ref); - } - String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE); - if (!StringUtils.hasText(inputChannel)) { - throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required"); - } - Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); - if (pollerElement != null) { - String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext); - builder.addPropertyReference("source", pollerBeanName); - } - else { - builder.addPropertyValue("inputChannelName", inputChannel); - } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined( - builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target"); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); - } - - private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingSplitter.class); - builder.addConstructorArgReference(ref); - builder.addConstructorArgValue(method); - String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName(builder.getBeanDefinition(), parserContext.getRegistry()); - BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName); - parserContext.registerBeanComponent(new BeanComponentDefinition(holder)); - return adapterBeanName; + protected Class getMethodInvokingAdapterClass() { + return MethodInvokingSplitter.class; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java new file mode 100644 index 0000000000..ca1ddd1370 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRouter.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.router; + +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.message.MessageExchangeTemplate; +import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessagingException; + +/** + * Base class for message router implementations. + * + * @author Mark Fisher + */ +public abstract class AbstractRouter implements Router, ChannelRegistryAware { + + protected final Log logger = LogFactory.getLog(this.getClass()); + + private volatile ChannelRegistry channelRegistry; + + private final MessageExchangeTemplate messageExchangeTemplate = new MessageExchangeTemplate(); + + + public void setChannelRegistry(ChannelRegistry channelRegistry) { + this.channelRegistry = channelRegistry; + } + + protected ChannelRegistry getChannelRegistry() { + return this.channelRegistry; + } + + public final boolean route(Message message) { + Collection results = this.resolveChannels(message); + if (results == null || results.isEmpty()) { + return false; + } + boolean sent = false; + for (Object channelOrName : results) { + MessageTarget target = null; + if (channelOrName == null) { + continue; + } + if (channelOrName instanceof MessageTarget) { + target = (MessageTarget) channelOrName; + } + else if (channelOrName instanceof String) { + if (this.channelRegistry == null) { + throw new MessagingException(message, "router has no ChannelRegistry"); + } + target = this.channelRegistry.lookupChannel((String) channelOrName); + } + else { + throw new MessagingException(message, "unsupported return type for router [" + channelOrName.getClass() + "]"); + } + if (target == null) { + throw new MessageDeliveryException(message, "unable to resolve channel for '" + channelOrName + "'"); + } + this.messageExchangeTemplate.send(message, target); + sent = true; + } + return sent; + } + + /** + * Subclasses must implement this method to return 0 or more MessageChannel + * instances or channel names to which the given Message should be routed. + */ + protected abstract Collection resolveChannels(Message message); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java deleted file mode 100644 index 78e945af20..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/AbstractRoutingMessageHandler.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.router; - -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.InitializingBean; -import org.springframework.integration.ConfigurationException; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.handler.MessageHandler; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageHandlingException; - -/** - * Base class for message router implementations. - * - * @author Mark Fisher - */ -public abstract class AbstractRoutingMessageHandler implements MessageHandler, ChannelRegistryAware, InitializingBean { - - protected Log logger = LogFactory.getLog(this.getClass()); - - private ChannelRegistry channelRegistry; - - private boolean resolutionRequired = false; - - private long timeout = -1; - - - /** - * Set whether this router should always be required to resolve at least one - * channel. The default is 'false'. To trigger an exception whenever the - * resolver returns null or an empty channel list, set this value to 'true'. - */ - public void setResolutionRequired(boolean resolutionRequired) { - this.resolutionRequired = resolutionRequired; - } - - /** - * Set the timeout for sending a message to the resolved channel. By - * default, there is no timeout, meaning the send will block indefinitely. - */ - public void setTimeout(long timeout) { - this.timeout = timeout; - } - - public void setChannelRegistry(ChannelRegistry channelRegistry) { - this.channelRegistry = channelRegistry; - } - - protected ChannelRegistry getChannelRegistry() { - return this.channelRegistry; - } - - public final void afterPropertiesSet() { - this.validate(); - } - - public final Message handle(Message message) { - List channels = this.resolveChannels(message); - if (channels == null || channels.size() == 0) { - String description = "failed to resolve any channel for message"; - if (this.resolutionRequired) { - throw new MessageHandlingException(message, description); - } - if (logger.isWarnEnabled()) { - logger.warn(description); - } - return null; - } - for (MessageChannel channel : channels) { - this.sendMesage(message, channel); - } - return null; - } - - private void sendMesage(Message message, MessageChannel channel) { - boolean sent = false; - if (timeout < 0) { - sent = channel.send(message); - } - else { - sent = channel.send(message, timeout); - } - if (!sent) { - throw new MessageHandlingException(message, - "failed to send message to channel '" + channel.getName() + "'"); - } - } - - protected abstract void validate() throws ConfigurationException; - - protected abstract List resolveChannels(Message message); - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java new file mode 100644 index 0000000000..2831a8111f --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/MethodInvokingRouter.java @@ -0,0 +1,86 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.router; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageMappingMethodInvoker; + +/** + * A {@link Router} implementation that invokes the specified method + * on the given object. The method's return value may be a single + * MessageChannel instance, a single String to be interpreted as + * a channel name, or a Collection (or Array) of either type. + * + * @author Mark Fisher + */ +public class MethodInvokingRouter extends AbstractRouter implements InitializingBean { + + private final MessageMappingMethodInvoker invoker; + + + public MethodInvokingRouter(Object object, Method method) { + this.invoker = new MessageMappingMethodInvoker(object, method); + } + + public MethodInvokingRouter(Object object, String methodName) { + this.invoker = new MessageMappingMethodInvoker(object, methodName); + } + + + public void afterPropertiesSet() throws Exception { + this.invoker.afterPropertiesSet(); + } + + @Override + protected Collection resolveChannels(Message message) { + Object result = this.invoker.invokeMethod(message); + if (result == null) { + return null; + } + List channels = new ArrayList(); + if (result instanceof Collection) { + channels.addAll((Collection) result); + } + else if (result instanceof MessageChannel[]) { + channels.addAll(Arrays.asList((MessageChannel[]) result)); + } + else if (result instanceof String[]) { + channels.addAll(Arrays.asList((String[]) result)); + } + else if (result instanceof MessageChannel) { + channels.add((MessageChannel) result); + } + else if (result instanceof String) { + channels.add(result); + } + else { + throw new ConfigurationException( + "router method must return type 'MessageChannel' or 'String'"); + } + return channels; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/MultiChannelRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/MultiChannelRouter.java index c2bd71f203..fa3addb3bb 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/MultiChannelRouter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/MultiChannelRouter.java @@ -16,29 +16,26 @@ package org.springframework.integration.router; -import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; +import java.util.Collection; +import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.Message; +import org.springframework.util.ObjectUtils; /** - * A router implementation for sending to potentially multiple - * {@link MessageChannel MessageChannels}. Requires either a - * {@link MultiChannelResolver} or {@link MultiChannelNameResolver} strategy - * instance. In the case of the latter, the - * {@link org.springframework.integration.channel.ChannelRegistry} reference - * must also be provided. For convenience, the superclass does implement - * {@link org.springframework.integration.channel.ChannelRegistryAware}. + * A router implementation for sending to potentially multiple {@link MessageChannel MessageChannels}. + * Requires either a {@link MultiChannelResolver} or {@link MultiChannelNameResolver} strategy instance. * * @author Mark Fisher */ -public class MultiChannelRouter extends AbstractRoutingMessageHandler { +public class MultiChannelRouter extends AbstractRouter implements InitializingBean { - private MultiChannelResolver channelResolver; + private volatile MultiChannelResolver channelResolver; - private MultiChannelNameResolver channelNameResolver; + private volatile MultiChannelNameResolver channelNameResolver; public void setChannelResolver(MultiChannelResolver channelResolver) { @@ -49,38 +46,23 @@ public class MultiChannelRouter extends AbstractRoutingMessageHandler { this.channelNameResolver = channelNameResolver; } - @Override - public void validate() { + public void afterPropertiesSet() { if (!(this.channelResolver != null ^ this.channelNameResolver != null)) { throw new ConfigurationException( "exactly one of 'channelResolver' or 'channelNameResolver' must be provided"); } - if (this.channelNameResolver != null && this.getChannelRegistry() == null) { - throw new ConfigurationException("'channelRegistry' is required when resolving by channel name"); - } } @Override - public List resolveChannels(Message message) { + public Collection resolveChannels(Message message) { if (this.channelResolver != null) { return this.channelResolver.resolve(message); } - if (this.channelNameResolver == null || this.getChannelRegistry() == null) { - throw new ConfigurationException("router configuration requires either " - + "a 'channelResolver' or both 'channelNameResolver' and 'channelRegistry'"); - } String[] channelNames = this.channelNameResolver.resolve(message); - if (channelNames == null) { + if (ObjectUtils.isEmpty(channelNames)) { return null; } - List channels = new ArrayList(channelNames.length); - for (String channelName : channelNames) { - MessageChannel channel = this.getChannelRegistry().lookupChannel(channelName); - if (channel != null) { - channels.add(channel); - } - } - return channels; + return Arrays.asList(channelNames); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/Router.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/Router.java new file mode 100644 index 0000000000..89fdcca2f2 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/Router.java @@ -0,0 +1,30 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.router; + +import org.springframework.integration.message.Message; + +/** + * Strategy interface for routing a Message to one or more channels. + * + * @author Mark Fisher + */ +public interface Router { + + boolean route(Message message); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java new file mode 100644 index 0000000000..d6fe4dad79 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java @@ -0,0 +1,107 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.router; + +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.util.Assert; + +/** + * @author Mark Fisher + */ +public class RouterEndpoint extends AbstractEndpoint { + + private final Router router; + + private volatile MessageChannel defaultOutputChannel; + + private volatile boolean resolutionRequired; + + + public RouterEndpoint(Router router) { + Assert.notNull(router, "router must not be null"); + this.router = router; + } + + + @Override + public void setChannelRegistry(ChannelRegistry channelRegistry) { + super.setChannelRegistry(channelRegistry); + if (this.router instanceof ChannelRegistryAware) { + ((ChannelRegistryAware) this.router).setChannelRegistry(channelRegistry); + } + } + + public void setDefaultOutputChannel(MessageChannel defaultOutputChannel) { + this.defaultOutputChannel = defaultOutputChannel; + } + + /** + * Set the timeout for sending a message to the resolved channel. By + * default, there is no timeout, meaning the send will block indefinitely. + */ + public void setTimeout(long timeout) { + this.getMessageExchangeTemplate().setSendTimeout(timeout); + } + + /** + * Set whether this router should always be required to resolve at least one + * channel. The default is 'false'. To trigger an exception whenever the + * resolver returns null or an empty channel list, and this endpoint has + * no 'defaultOutputChannel' configured, set this value to 'true'. + */ + public void setResolutionRequired(boolean resolutionRequired) { + this.resolutionRequired = resolutionRequired; + } + + @Override + protected boolean sendInternal(Message message) { + boolean sent = this.router.route(message); + if (!sent) { + if (this.defaultOutputChannel != null) { + sent = this.getMessageExchangeTemplate().send(message, this.defaultOutputChannel); + } + else if (this.resolutionRequired) { + throw new MessageDeliveryException(message, + "no target resolved by router and no default output channel defined"); + } + } + return sent; + } + + + // TODO: remove these methods after refactoring + + private volatile String inputChannelName; + + public String getInputChannelName() { + return this.inputChannelName; + } + + public void setInputChannelName(String inputChannelName) { + this.inputChannelName = inputChannelName; + } + + public String getOutputChannelName() { + return null; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterMessageHandler.java index bb4c621629..c4f3ef723e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterMessageHandler.java @@ -17,95 +17,40 @@ package org.springframework.integration.router; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import org.springframework.integration.ConfigurationException; -import org.springframework.integration.annotation.Router; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.handler.AbstractMessageHandler; -import org.springframework.integration.message.CompositeMessage; +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; +import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageHandlingException; -import org.springframework.integration.message.MessageTarget; /** * MessageHandler adapter for methods annotated with {@link Router @Router}. * * @author Mark Fisher */ -public class RouterMessageHandler extends AbstractMessageHandler { +public class RouterMessageHandler implements MessageHandler, ChannelRegistryAware { - private volatile MessageChannel defaultChannel; + private final Router router; public RouterMessageHandler(Object object, Method method) { - super(object, method); + this.router = new MethodInvokingRouter(object, method); } public RouterMessageHandler(Object object, String methodName) { - super(object, methodName); - } - - public RouterMessageHandler() { + this.router = new MethodInvokingRouter(object, methodName); } - public void setDefaultChannel(MessageChannel defaultChannel) { - this.defaultChannel = defaultChannel; + public Message handle(Message message) { + this.router.route(message); + return null; } - @Override - protected Message createReplyMessage(Object result, Message requestMessage) { - final List channels = new ArrayList(); - if (result != null) { - if (result instanceof Collection) { - channels.addAll((Collection) result); - } - else if (result instanceof MessageChannel[]) { - channels.addAll(Arrays.asList((MessageChannel[]) result)); - } - else if (result instanceof String[]) { - channels.addAll(Arrays.asList((String[]) result)); - } - else if (result instanceof MessageChannel) { - channels.add((MessageChannel) result); - } - else if (result instanceof String) { - channels.add(result); - } - else { - throw new ConfigurationException( - "router method must return type 'MessageChannel' or 'String'"); - } + public void setChannelRegistry(ChannelRegistry channelRegistry) { + if (this.router instanceof ChannelRegistryAware) { + ((ChannelRegistryAware) this.router).setChannelRegistry(channelRegistry); } - if (channels.size() == 0) { - if (this.defaultChannel != null) { - return MessageBuilder.fromMessage(requestMessage).setNextTarget(this.defaultChannel).build(); - } - return null; - } - List> replies = new ArrayList>(); - for (Object channel : channels) { - MessageBuilder builder = MessageBuilder.fromMessage(requestMessage); - if (channel instanceof MessageTarget) { - builder.setNextTarget((MessageTarget) channel); - } - else if (channel instanceof String) { - builder.setNextTarget((String) channel); - } - replies.add(builder.build()); - } - return new CompositeMessage(replies); - } - - @Override - protected Message postProcessReplyMessage(Message replyMessage, Message requestMessage) { - throw new MessageHandlingException(requestMessage, - "router method must return type 'MessageChannel' or 'String', but a Message was returned: " + replyMessage); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/SingleChannelRouter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/SingleChannelRouter.java index 0e59d65527..32b7c6ddd2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/SingleChannelRouter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/SingleChannelRouter.java @@ -16,9 +16,10 @@ package org.springframework.integration.router; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; +import java.util.Collections; +import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.Message; @@ -26,14 +27,11 @@ import org.springframework.integration.message.Message; /** * A router implementation for sending to at most one {@link MessageChannel}. * Requires either a {@link ChannelResolver} or {@link ChannelNameResolver} - * strategy instance. In the case of the latter, the - * {@link org.springframework.integration.channel.ChannelRegistry} reference - * must also be provided. For convenience, the superclass does implement - * {@link org.springframework.integration.channel.ChannelRegistryAware}. + * strategy instance. * * @author Mark Fisher */ -public class SingleChannelRouter extends AbstractRoutingMessageHandler { +public class SingleChannelRouter extends AbstractRouter implements InitializingBean { private ChannelResolver channelResolver; @@ -48,37 +46,22 @@ public class SingleChannelRouter extends AbstractRoutingMessageHandler { this.channelNameResolver = channelNameResolver; } - @Override - public void validate() { + public void afterPropertiesSet() { if (!(this.channelResolver != null ^ this.channelNameResolver != null)) { throw new ConfigurationException( "exactly one of 'channelResolver' or 'channelNameResolver' must be provided"); } - if (this.channelNameResolver != null && this.getChannelRegistry() == null) { - throw new ConfigurationException("'channelRegistry' is required when resolving by channel name"); - } } @Override - public List resolveChannels(Message message) { - List channels = new ArrayList(); - MessageChannel channel = this.resolveChannel(message); - if (channel != null) { - channels.add(channel); + protected Collection resolveChannels(Message message) { + Object result = (this.channelResolver != null) + ? this.channelResolver.resolve(message) + : this.channelNameResolver.resolve(message); + if (result == null) { + return null; } - return channels; - } - - private MessageChannel resolveChannel(Message message) { - if (this.channelResolver != null) { - return this.channelResolver.resolve(message); - } - if (this.channelNameResolver == null || this.getChannelRegistry() == null) { - throw new ConfigurationException("router configuration requires either " - + "a 'channelResolver' or both 'channelNameResolver' and 'channelRegistry'"); - } - String channelName = this.channelNameResolver.resolve(message); - return this.getChannelRegistry().lookupChannel(channelName); + return Collections.singletonList(result); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java similarity index 59% rename from org.springframework.integration/src/test/java/org/springframework/integration/router/RouterMessageHandlerTests.java rename to org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java index c6689156d5..ba19c4cf2d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterMessageHandlerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java @@ -17,8 +17,10 @@ package org.springframework.integration.router; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.lang.reflect.Method; import java.util.ArrayList; @@ -32,101 +34,117 @@ import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageHandlingException; +import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher */ -public class RouterMessageHandlerTests { +public class MethodInvokingRouterTests { @Test public void channelNameResolutionByPayloadConfiguredByMethodReference() throws Exception { + QueueChannel barChannel = new QueueChannel(); + barChannel.setBeanName("bar-channel"); + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + channelRegistry.registerChannel(barChannel); SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routePayload", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + router.setChannelRegistry(channelRegistry); Message message = new GenericMessage("bar"); - Message reply = handler.handle(message); - assertNotNull(reply); - assertEquals(CompositeMessage.class, reply.getClass()); - List> replyMessages = ((CompositeMessage) reply).getPayload(); - assertEquals(1, replyMessages.size()); - Message replyMessage = replyMessages.get(0); - assertEquals("bar", replyMessage.getPayload()); - assertEquals("bar-channel", replyMessage.getHeaders().getNextTarget()); + assertTrue(router.route(message)); + Message replyMessage = barChannel.receive(); + assertNotNull(replyMessage); + assertEquals(message, replyMessage); } @Test public void channelNameResolutionByPayloadConfiguredByMethodName() { + QueueChannel barChannel = new QueueChannel(); + barChannel.setBeanName("bar-channel"); + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + channelRegistry.registerChannel(barChannel); SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routePayload"); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routePayload"); + router.setChannelRegistry(channelRegistry); Message message = new GenericMessage("bar"); - Message reply = handler.handle(message); - assertNotNull(reply); - assertNotNull(reply); - assertEquals(CompositeMessage.class, reply.getClass()); - List> replyMessages = ((CompositeMessage) reply).getPayload(); - assertEquals(1, replyMessages.size()); - Message replyMessage = replyMessages.get(0); - assertEquals("bar", replyMessage.getPayload()); - assertEquals("bar-channel", replyMessage.getHeaders().getNextTarget()); + assertTrue(router.route(message)); + Message replyMessage = barChannel.receive(); + assertNotNull(replyMessage); + assertEquals(message, replyMessage); } @Test public void channelNameResolutionByHeader() throws Exception { + QueueChannel fooChannel = new QueueChannel(); + QueueChannel barChannel = new QueueChannel(); + fooChannel.setBeanName("foo-channel"); + barChannel.setBeanName("bar-channel"); + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + channelRegistry.registerChannel(fooChannel); + channelRegistry.registerChannel(barChannel); SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routeByHeader", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + router.setChannelRegistry(channelRegistry); Message message = MessageBuilder.fromPayload("bar") .setHeader("targetChannel", "foo").build(); - Message reply = handler.handle(message); - assertNotNull(reply); - assertEquals(CompositeMessage.class, reply.getClass()); - List> replyMessages = ((CompositeMessage) reply).getPayload(); - assertEquals(1, replyMessages.size()); - Message replyMessage = replyMessages.get(0); - assertEquals("bar", replyMessage.getPayload()); - assertEquals("foo-channel", replyMessage.getHeaders().getNextTarget()); + assertTrue(router.route(message)); + Message fooReply = fooChannel.receive(0); + Message barReply = barChannel.receive(0); + assertNotNull(fooReply); + assertNull(barReply); + assertEquals(message, fooReply); } - @Test(expected=MessageHandlingException.class) - public void failsWhenRequireddHeaderIsNotProvided() throws Exception { + @Test(expected = MessagingException.class) + public void failsWhenRequiredHeaderIsNotProvided() throws Exception { SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routeByHeader", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - handler.handle(new GenericMessage("testing")); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + router.route(new GenericMessage("testing")); } @Test public void channelNameResolutionByMessageConfiguredByMethodReference() throws Exception { SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routeMessage", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestChannelNameResolutionByMessage(handler); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestChannelNameResolutionByMessage(router); } @Test public void channelNameResolutionByMessageConfiguredByMethodName() { SingleChannelNameRoutingTestBean testBean = new SingleChannelNameRoutingTestBean(); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessage"); - this.doTestChannelNameResolutionByMessage(handler); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessage"); + this.doTestChannelNameResolutionByMessage(router); } - private void doTestChannelNameResolutionByMessage(RouterMessageHandler handler) { + private void doTestChannelNameResolutionByMessage(MethodInvokingRouter router) { + QueueChannel fooChannel = new QueueChannel(); + QueueChannel barChannel = new QueueChannel(); + fooChannel.setBeanName("foo-channel"); + barChannel.setBeanName("bar-channel"); + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + channelRegistry.registerChannel(fooChannel); + channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - Message result1 = ((CompositeMessage) handler.handle(fooMessage)).getPayload().get(0); + assertTrue(router.route(fooMessage)); + Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - Message result2 = ((CompositeMessage) handler.handle(barMessage)).getPayload().get(0); + assertTrue(router.route(barMessage)); + Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test @@ -134,19 +152,19 @@ public class RouterMessageHandlerTests { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); SingleChannelInstanceRoutingTestBean testBean = new SingleChannelInstanceRoutingTestBean(channelRegistry); Method routingMethod = testBean.getClass().getMethod("routePayload", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestChannelInstanceResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestChannelInstanceResolutionByPayload(router, channelRegistry); } @Test public void channelInstanceResolutionByPayloadConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); SingleChannelInstanceRoutingTestBean testBean = new SingleChannelInstanceRoutingTestBean(channelRegistry); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routePayload"); - this.doTestChannelInstanceResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routePayload"); + this.doTestChannelInstanceResolutionByPayload(router, channelRegistry); } - private void doTestChannelInstanceResolutionByPayload(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestChannelInstanceResolutionByPayload(MethodInvokingRouter router, ChannelRegistry channelRegistry) { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); @@ -156,15 +174,16 @@ public class RouterMessageHandlerTests { barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); - Message result1 = ((CompositeMessage) handler.handle(fooMessage)).getPayload().get(0); + router.setChannelRegistry(channelRegistry); + assertTrue(router.route(fooMessage)); + Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - assertEquals(fooChannel, result1.getHeaders().getNextTarget()); - Message result2 = ((CompositeMessage) handler.handle(barMessage)).getPayload().get(0); + assertTrue(router.route(barMessage)); + Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertEquals(barChannel, result2.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test @@ -172,37 +191,38 @@ public class RouterMessageHandlerTests { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); SingleChannelInstanceRoutingTestBean testBean = new SingleChannelInstanceRoutingTestBean(channelRegistry); Method routingMethod = testBean.getClass().getMethod("routeMessage", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestChannelInstanceResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestChannelInstanceResolutionByMessage(router, channelRegistry); } @Test public void channelInstanceResolutionByMessageConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); SingleChannelInstanceRoutingTestBean testBean = new SingleChannelInstanceRoutingTestBean(channelRegistry); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessage"); - this.doTestChannelInstanceResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessage"); + this.doTestChannelInstanceResolutionByMessage(router, channelRegistry); } - private void doTestChannelInstanceResolutionByMessage(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestChannelInstanceResolutionByMessage(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - Message result1 = ((CompositeMessage) handler.handle(fooMessage)).getPayload().get(0); + assertTrue(router.route(fooMessage)); + Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - assertEquals(fooChannel, result1.getHeaders().getNextTarget()); - Message result2 = ((CompositeMessage) handler.handle(barMessage)).getPayload().get(0); + assertTrue(router.route(barMessage)); + Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertEquals(barChannel, result2.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test @@ -210,47 +230,44 @@ public class RouterMessageHandlerTests { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routePayload", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelNameResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelNameResolutionByPayload(router, channelRegistry); } @Test public void multiChannelNameResolutionByPayloadConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routePayload"); - this.doTestMultiChannelNameResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routePayload"); + this.doTestMultiChannelNameResolutionByPayload(router, channelRegistry); } - private void doTestMultiChannelNameResolutionByPayload(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelNameResolutionByPayload(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); - Message result1b = reply1.getPayload().get(1); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); + Message result1b = barChannel.receive(0); assertNotNull(result1a); - assertEquals("foo", result1a.getPayload()); - assertEquals("foo-channel", result1a.getHeaders().getNextTarget()); + assertEquals("foo", result1a.getPayload()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals("bar-channel", result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); - Message result2b = reply2.getPayload().get(1); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); + Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals("foo-channel", result2a.getHeaders().getNextTarget()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals("bar-channel", result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test @@ -258,239 +275,224 @@ public class RouterMessageHandlerTests { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routeMessage", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelNameResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelNameResolutionByMessage(router, channelRegistry); } @Test public void multiChannelNameResolutionByMessageConfiguredByMethodName() throws Exception { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessage"); - this.doTestMultiChannelNameResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessage"); + this.doTestMultiChannelNameResolutionByMessage(router, channelRegistry); } - private void doTestMultiChannelNameResolutionByMessage(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelNameResolutionByMessage(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); - assertEquals("foo-channel", result1a.getHeaders().getNextTarget()); - Message result1b = reply1.getPayload().get(1); + Message result1b = barChannel.receive(0); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals("bar-channel", result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals("foo-channel", result2a.getHeaders().getNextTarget()); - Message result2b = reply2.getPayload().get(1); + Message result2b = barChannel.receive(0); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals("bar-channel", result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test - public void testMultiChannelNameArrayResolutionByMessageConfiguredByMethodReference() throws Exception { + public void multiChannelNameArrayResolutionByMessageConfiguredByMethodReference() throws Exception { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); Method routingMethod = testBean.getClass().getMethod("routeMessageToArray", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelNameArrayResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelNameArrayResolutionByMessage(router, channelRegistry); } @Test - public void testMultiChannelNameArrayResolutionByMessageConfiguredByMethodName() { + public void multiChannelNameArrayResolutionByMessageConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelNameRoutingTestBean testBean = new MultiChannelNameRoutingTestBean(); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessageToArray"); - this.doTestMultiChannelNameArrayResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessageToArray"); + this.doTestMultiChannelNameArrayResolutionByMessage(router, channelRegistry); } - private void doTestMultiChannelNameArrayResolutionByMessage(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelNameArrayResolutionByMessage(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); - assertEquals("foo-channel", result1a.getHeaders().getNextTarget()); - Message result1b = reply1.getPayload().get(1); + Message result1b = barChannel.receive(0); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals("bar-channel", result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals("foo-channel", result2a.getHeaders().getNextTarget()); - Message result2b = reply2.getPayload().get(1); + Message result2b = barChannel.receive(0); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals("bar-channel", result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test - public void testMultiChannelListResolutionByPayloadConfiguredByMethodReference() throws Exception { + public void multiChannelListResolutionByPayloadConfiguredByMethodReference() throws Exception { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); Method routingMethod = testBean.getClass().getMethod("routePayload", String.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelListResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelListResolutionByPayload(router, channelRegistry); } @Test - public void testMultiChannelListResolutionByPayloadConfiguredByMethodName() { + public void multiChannelListResolutionByPayloadConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routePayload"); - this.doTestMultiChannelListResolutionByPayload(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routePayload"); + this.doTestMultiChannelListResolutionByPayload(router, channelRegistry); } - private void doTestMultiChannelListResolutionByPayload(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelListResolutionByPayload(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); - Message result1b = reply1.getPayload().get(1); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); + Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); - assertEquals(fooChannel, result1a.getHeaders().getNextTarget()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals(barChannel, result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); - Message result2b = reply2.getPayload().get(1); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); + Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals(fooChannel, result2a.getHeaders().getNextTarget()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals(barChannel, result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test - public void testMultiChannelListResolutionByMessageConfiguredByMethodReference() throws Exception { + public void multiChannelListResolutionByMessageConfiguredByMethodReference() throws Exception { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); Method routingMethod = testBean.getClass().getMethod("routeMessage", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelListResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelListResolutionByMessage(router, channelRegistry); } @Test - public void testMultiChannelListResolutionByMessageConfiguredByMethodName() { + public void multiChannelListResolutionByMessageConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessage"); - this.doTestMultiChannelListResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessage"); + this.doTestMultiChannelListResolutionByMessage(router, channelRegistry); } - private void doTestMultiChannelListResolutionByMessage(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelListResolutionByMessage(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); - Message result1b = reply1.getPayload().get(1); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); + Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); - assertEquals(fooChannel, result1a.getHeaders().getNextTarget()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals(barChannel, result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); - Message result2b = reply2.getPayload().get(1); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); + Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals(fooChannel, result2a.getHeaders().getNextTarget()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals(barChannel, result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } @Test - public void testMultiChannelArrayResolutionByMessageConfiguredByMethodReference() throws Exception { + public void multiChannelArrayResolutionByMessageConfiguredByMethodReference() throws Exception { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); Method routingMethod = testBean.getClass().getMethod("routeMessageToArray", Message.class); - RouterMessageHandler handler = new RouterMessageHandler(testBean, routingMethod); - this.doTestMultiChannelArrayResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, routingMethod); + this.doTestMultiChannelArrayResolutionByMessage(router, channelRegistry); } @Test - public void testMultiChannelArrayResolutionByMessageConfiguredByMethodName() { + public void multiChannelArrayResolutionByMessageConfiguredByMethodName() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelInstanceRoutingTestBean testBean = new MultiChannelInstanceRoutingTestBean(channelRegistry); - RouterMessageHandler handler = new RouterMessageHandler(testBean, "routeMessageToArray"); - this.doTestMultiChannelArrayResolutionByMessage(handler, channelRegistry); + MethodInvokingRouter router = new MethodInvokingRouter(testBean, "routeMessageToArray"); + this.doTestMultiChannelArrayResolutionByMessage(router, channelRegistry); } - private void doTestMultiChannelArrayResolutionByMessage(RouterMessageHandler handler, ChannelRegistry channelRegistry) { + private void doTestMultiChannelArrayResolutionByMessage(MethodInvokingRouter router, ChannelRegistry channelRegistry) { QueueChannel fooChannel = new QueueChannel(); QueueChannel barChannel = new QueueChannel(); fooChannel.setBeanName("foo-channel"); barChannel.setBeanName("bar-channel"); channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); + router.setChannelRegistry(channelRegistry); Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - CompositeMessage reply1 = (CompositeMessage) handler.handle(fooMessage); - Message result1a = reply1.getPayload().get(0); - Message result1b = reply1.getPayload().get(1); + assertTrue(router.route(fooMessage)); + Message result1a = fooChannel.receive(0); + Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); - assertEquals(fooChannel, result1a.getHeaders().getNextTarget()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertEquals(barChannel, result1b.getHeaders().getNextTarget()); - CompositeMessage reply2 = (CompositeMessage) handler.handle(barMessage); - Message result2a = reply2.getPayload().get(0); - Message result2b = reply2.getPayload().get(1); + assertTrue(router.route(barMessage)); + Message result2a = fooChannel.receive(0); + Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); - assertEquals(fooChannel, result2a.getHeaders().getNextTarget()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertEquals(barChannel, result2b.getHeaders().getNextTarget()); - assertNull(handler.handle(badMessage)); + assertFalse(router.route(badMessage)); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java index 82898061c2..14b344d3e8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java @@ -30,7 +30,7 @@ import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageHandlingException; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.StringMessage; /** @@ -39,7 +39,7 @@ import org.springframework.integration.message.StringMessage; public class MultiChannelRouterTests { @Test - public void testRoutingWithChannelResolver() { + public void routeWithChannelResolver() { final QueueChannel channel1 = new QueueChannel(); final QueueChannel channel2 = new QueueChannel(); MultiChannelResolver channelResolver = new MultiChannelResolver() { @@ -54,7 +54,7 @@ public class MultiChannelRouterTests { router.setChannelResolver(channelResolver); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -64,7 +64,7 @@ public class MultiChannelRouterTests { } @Test - public void testRoutingWithChannelNameResolver() { + public void routeWithChannelNameResolver() { MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { public String[] resolve(Message message) { return new String[] {"channel1", "channel2"}; @@ -82,7 +82,7 @@ public class MultiChannelRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -91,8 +91,8 @@ public class MultiChannelRouterTests { assertEquals("test", result2.getPayload()); } - @Test(expected=ConfigurationException.class) - public void testConfiguringBothChannelResolverAndChannelNameResolverIsNotAllowed() { + @Test(expected = ConfigurationException.class) + public void configuringBothChannelResolverAndChannelNameResolverIsNotAllowed() { MultiChannelResolver channelResolver = new MultiChannelResolver() { public List resolve(Message message) { return null; @@ -109,37 +109,8 @@ public class MultiChannelRouterTests { router.afterPropertiesSet(); } - @Test - public void testChannelResolutionFailureIgnoredByDefault() { - MultiChannelResolver channelResolver = new MultiChannelResolver() { - public List resolve(Message message) { - return null; - } - }; - MultiChannelRouter router = new MultiChannelRouter(); - router.setChannelResolver(channelResolver); - router.afterPropertiesSet(); - Message message = new StringMessage("test"); - router.handle(message); - } - - @Test(expected=MessageHandlingException.class) - public void testChannelResolutionFailureThrowsExceptionWhenResolutionRequired() { - MultiChannelResolver channelResolver = new MultiChannelResolver() { - public List resolve(Message message) { - return null; - } - }; - MultiChannelRouter router = new MultiChannelRouter(); - router.setChannelResolver(channelResolver); - router.setResolutionRequired(true); - router.afterPropertiesSet(); - Message message = new StringMessage("test"); - router.handle(message); - } - - @Test - public void testChannelNameResolutionFailureIgnoredByDefault() { + @Test(expected = MessageDeliveryException.class) + public void channelNameLookupFailure() { MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { public String[] resolve(Message message) { return new String[] {"noSuchChannel"}; @@ -151,52 +122,11 @@ public class MultiChannelRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); } - @Test(expected=MessageHandlingException.class) - public void testChannelNameResolutionFailureThrowsExceptionWhenResolutionRequired() { - MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { - public String[] resolve(Message message) { - return null; - } - }; - ChannelRegistry channelRegistry = new DefaultChannelRegistry(); - MultiChannelRouter router = new MultiChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.setChannelRegistry(channelRegistry); - router.setResolutionRequired(true); - router.afterPropertiesSet(); - Message message = new StringMessage("test"); - router.handle(message); - } - - @Test(expected=ConfigurationException.class) - public void testChannelRegistryIsRequiredWhenUsingChannelNameResolver() { - MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { - public String[] resolve(Message message) { - return new String[] {"notImportant"}; - } - }; - MultiChannelRouter router = new MultiChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.resolveChannels(new StringMessage("this should fail")); - } - - @Test(expected=ConfigurationException.class) - public void testValidateChannelRegistryIsPresentWhenUsingChannelNameResolver() { - MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { - public String[] resolve(Message message) { - return new String[] {"notImportant"}; - } - }; - MultiChannelRouter router = new MultiChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.afterPropertiesSet(); - } - - @Test(expected=ConfigurationException.class) - public void testChannelResolverIsRequired() { + @Test(expected = ConfigurationException.class) + public void channelResolverIsRequired() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); MultiChannelRouter router = new MultiChannelRouter(); router.setChannelRegistry(channelRegistry); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java index 5c62c698c8..79e38716f0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java @@ -47,8 +47,8 @@ public class PayloadTypeRouterTests { router.afterPropertiesSet(); Message message1 = new StringMessage("test"); Message message2 = new GenericMessage(123); - router.handle(message1); - router.handle(message2); + router.route(message1); + router.route(message2); Message result1 = stringChannel.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -69,8 +69,8 @@ public class PayloadTypeRouterTests { router.afterPropertiesSet(); Message message1 = new StringMessage("test"); Message message2 = new GenericMessage(123); - router.handle(message1); - router.handle(message2); + router.route(message1); + router.route(message2); Message result1 = stringChannel.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java index d844ed495c..05ee1739db 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java @@ -39,7 +39,7 @@ import org.springframework.integration.message.StringMessage; public class RecipientListRouterTests { @Test - public void testRoutingWithChannelList() { + public void routeWithChannelList() { QueueChannel channel1 = new QueueChannel(); QueueChannel channel2 = new QueueChannel(); List channels = new ArrayList(); @@ -49,7 +49,7 @@ public class RecipientListRouterTests { router.setChannels(channels); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -59,7 +59,7 @@ public class RecipientListRouterTests { } @Test - public void testRoutingWithChannelNames() { + public void routeWithChannelNames() { QueueChannel channel1 = new QueueChannel(); QueueChannel channel2 = new QueueChannel(); channel1.setBeanName("channel1"); @@ -72,7 +72,7 @@ public class RecipientListRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -82,7 +82,7 @@ public class RecipientListRouterTests { } @Test - public void testRoutingToSingleChannelByName() { + public void routeToSingleChannelByName() { QueueChannel channel1 = new QueueChannel(); QueueChannel channel2 = new QueueChannel(); channel1.setBeanName("channel1"); @@ -95,7 +95,7 @@ public class RecipientListRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -103,8 +103,8 @@ public class RecipientListRouterTests { assertNull(result2); } - @Test(expected=ConfigurationException.class) - public void testConfigurationExceptionWhenBothChannelsAndNamesAreProvided() { + @Test(expected = ConfigurationException.class) + public void configurationExceptionWhenBothChannelsAndNamesAreProvided() { QueueChannel channel1 = new QueueChannel(); QueueChannel channel2 = new QueueChannel(); channel1.setBeanName("channel1"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java index 1405c92ef9..8e0a12053b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java @@ -65,7 +65,7 @@ public class RootCauseErrorMessageRouterTests { router.setChannelMappings(channelMappings); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); @@ -87,7 +87,7 @@ public class RootCauseErrorMessageRouterTests { router.setChannelMappings(channelMappings); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(runtimeExceptionChannel.receive(1000)); assertNull(illegalArgumentChannel.receive(0)); assertNull(defaultChannel.receive(0)); @@ -108,7 +108,7 @@ public class RootCauseErrorMessageRouterTests { router.setChannelMappings(channelMappings); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(messageHandlingExceptionChannel.receive(1000)); assertNull(runtimeExceptionChannel.receive(0)); assertNull(illegalArgumentChannel.receive(0)); @@ -125,14 +125,14 @@ public class RootCauseErrorMessageRouterTests { RootCauseErrorMessageRouter router = new RootCauseErrorMessageRouter(); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(defaultChannel.receive(1000)); assertNull(runtimeExceptionChannel.receive(0)); assertNull(illegalArgumentChannel.receive(0)); assertNull(messageHandlingExceptionChannel.receive(0)); } - @Test(expected=MessageHandlingException.class) + @Test(expected = MessageDeliveryException.class) public void testNoMatchAndNoDefaultChannel() { Message failedMessage = new StringMessage("foo"); IllegalArgumentException rootCause = new IllegalArgumentException("bad argument"); @@ -145,8 +145,9 @@ public class RootCauseErrorMessageRouterTests { channelMappings.put(MessageDeliveryException.class, messageDeliveryExceptionChannel); router.setChannelMappings(channelMappings); router.afterPropertiesSet(); - router.setResolutionRequired(true); - router.handle(message); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setResolutionRequired(true); + endpoint.send(message); } @Test @@ -165,7 +166,7 @@ public class RootCauseErrorMessageRouterTests { router.setChannelMappings(channelMappings); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); @@ -187,7 +188,7 @@ public class RootCauseErrorMessageRouterTests { router.setChannelMappings(channelMappings); router.setDefaultChannel(defaultChannel); router.afterPropertiesSet(); - router.handle(message); + router.route(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java new file mode 100644 index 0000000000..ead82adfd4 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java @@ -0,0 +1,198 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.router; + +import static org.junit.Assert.assertFalse; + +import java.util.Collections; +import java.util.List; + +import org.junit.Test; + +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.DefaultChannelRegistry; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.message.MessagingException; +import org.springframework.integration.message.StringMessage; + +/** + * @author Mark Fisher + */ +public class RouterEndpointTests { + + @Test + public void nullChannelIgnoredByDefault() { + MultiChannelResolver channelResolver = new MultiChannelResolver() { + public List resolve(Message message) { + return null; + } + }; + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelResolver(channelResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + Message message = new StringMessage("test"); + assertFalse(endpoint.send(message)); + } + + @Test(expected = MessageDeliveryException.class) + public void nullChannelThrowsExceptionWhenResolutionRequired() { + MultiChannelResolver channelResolver = new MultiChannelResolver() { + public List resolve(Message message) { + return null; + } + }; + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelResolver(channelResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setResolutionRequired(true); + Message message = new StringMessage("test"); + endpoint.send(message); + } + + @Test + public void emptyChannelListIgnoredByDefault() { + MultiChannelResolver channelResolver = new MultiChannelResolver() { + public List resolve(Message message) { + return Collections.emptyList(); + } + }; + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelResolver(channelResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + Message message = new StringMessage("test"); + assertFalse(endpoint.send(message)); + } + + @Test(expected = MessageDeliveryException.class) + public void emptyChannelListThrowsExceptionWhenResolutionRequired() { + MultiChannelResolver channelResolver = new MultiChannelResolver() { + public List resolve(Message message) { + return Collections.emptyList(); + } + }; + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelResolver(channelResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setResolutionRequired(true); + Message message = new StringMessage("test"); + endpoint.send(message); + } + + @Test + public void nullChannelNameArrayIgnoredByDefault() { + MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { + public String[] resolve(Message message) { + return null; + } + }; + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setChannelRegistry(channelRegistry); + Message message = new StringMessage("test"); + assertFalse(endpoint.send(message)); + } + + @Test(expected = MessageDeliveryException.class) + public void nullChannelNameArrayThrowsExceptionWhenResolutionRequired() { + MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { + public String[] resolve(Message message) { + return null; + } + }; + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setChannelRegistry(channelRegistry); + endpoint.setResolutionRequired(true); + Message message = new StringMessage("test"); + endpoint.send(message); + } + + + @Test + public void emptyChannelNameArrayIgnoredByDefault() { + MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { + public String[] resolve(Message message) { + return new String[] {}; + } + }; + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setChannelRegistry(channelRegistry); + Message message = new StringMessage("test"); + assertFalse(endpoint.send(message)); + } + + @Test(expected = MessageDeliveryException.class) + public void emptyChannelNameArrayThrowsExceptionWhenResolutionRequired() { + MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { + public String[] resolve(Message message) { + return new String[] {}; + } + }; + ChannelRegistry channelRegistry = new DefaultChannelRegistry(); + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + router.afterPropertiesSet(); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.setChannelRegistry(channelRegistry); + endpoint.setResolutionRequired(true); + Message message = new StringMessage("test"); + endpoint.send(message); + } + + @Test(expected = MessagingException.class) + public void testChannelRegistryIsRequiredWhenUsingChannelNameResolverWithSingleChannelRouter() { + ChannelNameResolver channelNameResolver = new ChannelNameResolver() { + public String resolve(Message message) { + return "notImportant"; + } + }; + SingleChannelRouter router = new SingleChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.send(new StringMessage("this should fail")); + } + + @Test(expected = MessagingException.class) + public void testChannelRegistryIsRequiredWhenUsingChannelNameResolverWithMultiChannelRouter() { + MultiChannelNameResolver channelNameResolver = new MultiChannelNameResolver() { + public String[] resolve(Message message) { + return new String[] { "notImportant" }; + } + }; + MultiChannelRouter router = new MultiChannelRouter(); + router.setChannelNameResolver(channelNameResolver); + RouterEndpoint endpoint = new RouterEndpoint(router); + endpoint.send(new StringMessage("this should fail")); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java index 04e36f8c66..69be1b8a03 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java @@ -17,6 +17,7 @@ package org.springframework.integration.router; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import org.junit.Test; @@ -27,7 +28,7 @@ import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageHandlingException; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.StringMessage; /** @@ -36,7 +37,7 @@ import org.springframework.integration.message.StringMessage; public class SingleChannelRouterTests { @Test - public void testRoutingWithChannelResolver() { + public void routeWithChannelResolver() { final QueueChannel channel = new QueueChannel(); ChannelResolver channelResolver = new ChannelResolver() { public MessageChannel resolve(Message message) { @@ -47,14 +48,14 @@ public class SingleChannelRouterTests { router.setChannelResolver(channelResolver); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result = channel.receive(25); assertNotNull(result); assertEquals("test", result.getPayload()); } @Test - public void testRoutingWithChannelNameResolver() { + public void routeWithChannelNameResolver() { ChannelNameResolver channelNameResolver = new ChannelNameResolver() { public String resolve(Message message) { return "testChannel"; @@ -69,14 +70,14 @@ public class SingleChannelRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); Message result = channel.receive(25); assertNotNull(result); assertEquals("test", result.getPayload()); } - @Test(expected=ConfigurationException.class) - public void testConfiguringBothChannelResolverAndChannelNameResolverIsNotAllowed() { + @Test(expected = ConfigurationException.class) + public void configuringBothChannelResolverAndChannelNameResolverIsNotAllowed() { ChannelResolver channelResolver = new ChannelResolver() { public MessageChannel resolve(Message message) { return new QueueChannel(); @@ -94,7 +95,7 @@ public class SingleChannelRouterTests { } @Test - public void testChannelResolutionFailureIgnoredByDefault() { + public void nullChannelResult() { ChannelResolver channelResolver = new ChannelResolver() { public MessageChannel resolve(Message message) { return null; @@ -104,26 +105,11 @@ public class SingleChannelRouterTests { router.setChannelResolver(channelResolver); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + assertFalse(router.route(message)); } - @Test(expected=MessageHandlingException.class) - public void testChannelResolutionFailureThrowsExceptionWhenResolutionRequired() { - ChannelResolver channelResolver = new ChannelResolver() { - public MessageChannel resolve(Message message) { - return null; - } - }; - SingleChannelRouter router = new SingleChannelRouter(); - router.setChannelResolver(channelResolver); - router.setResolutionRequired(true); - router.afterPropertiesSet(); - Message message = new StringMessage("test"); - router.handle(message); - } - - @Test - public void testChannelNameResolutionFailureIgnoredByDefault() { + @Test(expected = MessageDeliveryException.class) + public void channelNameResolutionFailure() { ChannelNameResolver channelNameResolver = new ChannelNameResolver() { public String resolve(Message message) { return "noSuchChannel"; @@ -135,51 +121,10 @@ public class SingleChannelRouterTests { router.setChannelRegistry(channelRegistry); router.afterPropertiesSet(); Message message = new StringMessage("test"); - router.handle(message); + router.route(message); } - @Test(expected=MessageHandlingException.class) - public void testChannelNameResolutionFailureThrowsExceptionWhenResolutionRequired() { - ChannelNameResolver channelNameResolver = new ChannelNameResolver() { - public String resolve(Message message) { - return "noSuchChannel"; - } - }; - ChannelRegistry channelRegistry = new DefaultChannelRegistry(); - SingleChannelRouter router = new SingleChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.setChannelRegistry(channelRegistry); - router.setResolutionRequired(true); - router.afterPropertiesSet(); - Message message = new StringMessage("test"); - router.handle(message); - } - - @Test(expected=ConfigurationException.class) - public void testChannelRegistryIsRequiredWhenUsingChannelNameResolver() { - ChannelNameResolver channelNameResolver = new ChannelNameResolver() { - public String resolve(Message message) { - return "notImportant"; - } - }; - SingleChannelRouter router = new SingleChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.resolveChannels(new StringMessage("this should fail")); - } - - @Test(expected=ConfigurationException.class) - public void testValidateChannelRegistryIsPresentWhenUsingChannelNameResolver() { - ChannelNameResolver channelNameResolver = new ChannelNameResolver() { - public String resolve(Message message) { - return "notImportant"; - } - }; - SingleChannelRouter router = new SingleChannelRouter(); - router.setChannelNameResolver(channelNameResolver); - router.afterPropertiesSet(); - } - - @Test(expected=ConfigurationException.class) + @Test(expected = ConfigurationException.class) public void testChannelResolverIsRequired() { ChannelRegistry channelRegistry = new DefaultChannelRegistry(); SingleChannelRouter router = new SingleChannelRouter();