diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java index 1f133412bb..f61ff6bd32 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java @@ -16,19 +16,94 @@ package org.springframework.integration.config; -import org.springframework.integration.handler.MessageHandler; -import org.springframework.integration.splitter.SplitterMessageHandler; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.config.BeanDefinitionHolder; +import org.springframework.beans.factory.parsing.BeanComponentDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.splitter.MethodInvokingSplitter; +import org.springframework.integration.splitter.SplitterEndpoint; +import org.springframework.util.StringUtils; +import org.springframework.util.xml.DomUtils; /** * Parser for the <splitter/> element. * * @author Mark Fisher */ -public class SplitterParser extends AbstractMessageEndpointParser { +public class SplitterParser extends AbstractSingleBeanDefinitionParser { + + protected static final String REF_ATTRIBUTE = "ref"; + + protected static final String METHOD_ATTRIBUTE = "method"; + + protected static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; + + protected static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; + + private static final String POLLER_ELEMENT = "poller"; + + private static final String ERROR_HANDLER_ATTRIBUTE = "error-handler"; + @Override - protected Class getHandlerAdapterClass() { - return SplitterMessageHandler.class; + protected Class getBeanClass(Element element) { + return SplitterEndpoint.class; + } + + @Override + protected boolean shouldGenerateId() { + return false; + } + + @Override + protected boolean shouldGenerateIdAsFallback() { + return true; + } + + @Override + protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + String ref = element.getAttribute(REF_ATTRIBUTE); + if (!StringUtils.hasText(ref)) { + throw new ConfigurationException("The '" + REF_ATTRIBUTE + "' attribute is required."); + } + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + String method = element.getAttribute(METHOD_ATTRIBUTE); + String adapterBeanName = this.parseAdapter(ref, method, element, parserContext); + builder.addConstructorArgReference(adapterBeanName); + } + else { + builder.addConstructorArgReference(ref); + } + String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE); + if (!StringUtils.hasText(inputChannel)) { + throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required"); + } + Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); + if (pollerElement != null) { + String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext); + builder.addPropertyReference("source", pollerBeanName); + } + else { + builder.addPropertyValue("inputChannelName", inputChannel); + } + IntegrationNamespaceUtils.setReferenceIfAttributeDefined( + builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "target"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); + } + + private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingSplitter.class); + builder.addConstructorArgReference(ref); + builder.addConstructorArgValue(method); + String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName(builder.getBeanDefinition(), parserContext.getRegistry()); + BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName); + parserContext.registerBeanComponent(new BeanComponentDefinition(holder)); + return adapterBeanName; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingMethodInvoker.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingMethodInvoker.java new file mode 100644 index 0000000000..acae956265 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingMethodInvoker.java @@ -0,0 +1,184 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.message; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.util.DefaultMethodInvoker; +import org.springframework.integration.util.MethodInvoker; +import org.springframework.integration.util.NameResolvingMethodInvoker; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; +import org.springframework.util.StringUtils; + +/** + * A base or helper class for any Messaging component that acts as an adapter + * by invoking a "plain" (not Message-aware) method for a given target object. + * The target Object is mandatory, and either a {@link Method} reference or a + * 'methodName' must be provided. If no Object and Method are provided, the + * handler will simply process the request Message's payload. + * + * @author Mark Fisher + */ +public class MessageMappingMethodInvoker implements InitializingBean { + + protected static final Log logger = LogFactory.getLog(MessageMappingMethodInvoker.class); + + private volatile boolean methodExpectsMessage; + + private volatile Object object; + + private volatile Method method; + + private volatile String methodName; + + private volatile MessageMappingParameterResolver parameterResolver; + + private volatile MethodInvoker invoker; + + private volatile boolean initialized; + + private final Object initializationMonitor = new Object(); + + + public MessageMappingMethodInvoker(Object object, Method method) { + Assert.notNull(object, "object must not be null"); + Assert.notNull(method, "method must not be null"); + this.object = object; + this.method = method; + this.methodName = method.getName(); + } + + public MessageMappingMethodInvoker(Object object, String methodName) { + Assert.notNull(object, "object must not be null"); + Assert.notNull(methodName, "methodName must not be null"); + this.object = object; + this.methodName = methodName; + } + + + public void afterPropertiesSet() { + synchronized (this.initializationMonitor) { + if (this.initialized) { + return; + } + if (this.method == null) { + final List candidates = new ArrayList(); + ReflectionUtils.doWithMethods(this.object.getClass(), new ReflectionUtils.MethodCallback() { + public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { + if (method.getName().equals(MessageMappingMethodInvoker.this.methodName)) { + candidates.add(method); + } + } + }); + if (candidates.size() == 0) { + throw new ConfigurationException("no such method '" + this.methodName + + "' on target class [" + this.object.getClass() + "]"); + } + if (candidates.size() == 1) { + this.method = candidates.get(0); + } + } + if (this.method != null) { + Class[] parameterTypes = this.method.getParameterTypes(); + if (parameterTypes.length == 0) { + throw new ConfigurationException("method must accept at least one parameter"); + } + if (parameterTypes.length == 1 && Message.class.isAssignableFrom(parameterTypes[0])) { + this.methodExpectsMessage = true; + } + this.invoker = new DefaultMethodInvoker(this.object, this.method); + this.parameterResolver = new MessageMappingParameterResolver(this.method); + } + else { + // TODO: resolve the candidate method and/or create a dynamic resolver + this.invoker = new NameResolvingMethodInvoker(this.object, this.methodName); + } + this.initialized = true; + } + } + + public Object invokeMethod(Message message) { + if (message == null || message.getPayload() == null) { + if (logger.isDebugEnabled()) { + logger.debug("received null message or payload"); + } + return null; + } + if (!this.initialized) { + this.afterPropertiesSet(); + } + if (this.invoker == null) { + return message.getPayload(); + } + Object args[] = null; + Object mappingResult = this.methodExpectsMessage + ? message : this.resolveParameters(message); + if (mappingResult != null && mappingResult.getClass().isArray() + && (Object.class.isAssignableFrom(mappingResult.getClass().getComponentType()))) { + args = (Object[]) mappingResult; + } + else { + args = new Object[] { mappingResult }; + } + try { + Object result = null; + try { + result = this.invoker.invokeMethod(args); + } + catch (NoSuchMethodException e) { + try { + result = this.invoker.invokeMethod(message); + this.methodExpectsMessage = true; + } + catch (NoSuchMethodException e2) { + throw new MessageHandlingException(message, "unable to resolve method for args: " + + StringUtils.arrayToCommaDelimitedString(args)); + } + } + if (result == null) { + return null; + } + return result; + } + catch (InvocationTargetException e) { + throw new MessageHandlingException(message, "Handler method '" + + this.methodName + "' threw an Exception.", e.getTargetException()); + } + catch (Throwable e) { + throw new MessageHandlingException(message, "Failed to invoke handler method '" + + this.methodName + "' with arguments: " + ObjectUtils.nullSafeToString(args), e); + } + } + + private Object[] resolveParameters(Message message) { + if (this.parameterResolver != null) { + return this.parameterResolver.resolveParameters(message); + } + return new Object[] { message.getPayload() }; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingParameterResolver.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingParameterResolver.java new file mode 100644 index 0000000000..8f991da66d --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageMappingParameterResolver.java @@ -0,0 +1,167 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.message; + +import java.lang.reflect.Method; +import java.util.Map; +import java.util.Properties; + +import org.springframework.core.GenericTypeResolver; +import org.springframework.core.LocalVariableTableParameterNameDiscoverer; +import org.springframework.core.MethodParameter; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.integration.annotation.Header; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; + +/** + * Prepares arguments for handler methods. The method parameters are matched + * against the Message payload as well as its headers. If a method parameter + * is annotated with {@link Header @Header}, the annotation's value will be + * used as a header name. If such an annotation contains no value, then the + * parameter name will be used as long as the information is available in the + * class file (requires compilation with debug settings for parameter names). + * If the {@link Header @Header} annotation is not present, then the parameter + * will typically match the Message payload. However, if a Map or Properties + * object is expected, and the paylaod is not itself assignable to that type, + * then the MessageHeaders' values will be passed in the case of a Map-typed + * parameter, or the MessageHeaders' String-based values will be passed in the + * case of a Properties-typed parameter. + * + * @author Mark Fisher + */ +public class MessageMappingParameterResolver { + + private final Method method; + + private volatile MethodParameterMetadata[] parameterMetadata; + + private final ParameterNameDiscoverer parameterNameDiscoverer = new LocalVariableTableParameterNameDiscoverer(); + + + public MessageMappingParameterResolver(Method method) { + Assert.notNull(method, "method must not be null"); + this.method = method; + this.initializeParameterMetadata(); + } + + + public Object[] resolveParameters(Message message) { + if (message == null) { + return null; + } + if (message.getPayload() == null) { + throw new IllegalArgumentException("Message payload must not be null."); + } + if (ObjectUtils.isEmpty(this.parameterMetadata)) { + return new Object[] { message.getPayload() }; + } + Object[] args = new Object[this.parameterMetadata.length]; + for (int i = 0; i < this.parameterMetadata.length; i++) { + MethodParameterMetadata metadata = this.parameterMetadata[i]; + Class expectedType = metadata.type; + if (expectedType.equals(Header.class)) { + Object value = message.getHeaders().get(metadata.key); + if (value == null && metadata.required) { + throw new MessageHandlingException(message, + "required header '" + metadata.key + "' not available"); + } + args[i] = value; + } + else if (expectedType.isAssignableFrom(message.getClass())) { + args[i] = message; + } + else if (expectedType.isAssignableFrom(message.getPayload().getClass())) { + args[i] = message.getPayload(); + } + else if (expectedType.equals(Map.class)) { + args[i] = message.getHeaders(); + } + else if (expectedType.equals(Properties.class)) { + args[i] = this.getStringTypedHeaders(message); + } + else { + args[i] = message.getPayload(); + } + } + return args; + } + + private void initializeParameterMetadata() { + Class[] paramTypes = this.method.getParameterTypes(); + this.parameterMetadata = new MethodParameterMetadata[paramTypes.length]; + for (int i = 0; i < parameterMetadata.length; i++) { + MethodParameter methodParam = new MethodParameter(this.method, i); + methodParam.initParameterNameDiscovery(this.parameterNameDiscoverer); + GenericTypeResolver.resolveParameterType(methodParam, this.method.getDeclaringClass()); + Object[] paramAnnotations = methodParam.getParameterAnnotations(); + String headerName = null; + for (int j = 0; j < paramAnnotations.length; j++) { + if (Header.class.isInstance(paramAnnotations[j])) { + Header headerAnnotation = (Header) paramAnnotations[j]; + headerName = this.resolveParameterNameIfNecessary(headerAnnotation.value(), methodParam); + parameterMetadata[i] = new MethodParameterMetadata(Header.class, headerName, headerAnnotation.required()); + } + } + if (headerName == null) { + parameterMetadata[i] = new MethodParameterMetadata(methodParam.getParameterType(), null, false); + } + } + } + + private Properties getStringTypedHeaders(Message message) { + Properties properties = new Properties(); + MessageHeaders headers = message.getHeaders(); + for (String key : headers.keySet()) { + Object value = headers.get(key); + if (value instanceof String) { + properties.setProperty(key, (String) value); + } + } + return properties; + } + + private String resolveParameterNameIfNecessary(String paramName, MethodParameter methodParam) { + if (!StringUtils.hasText(paramName)) { + paramName = methodParam.getParameterName(); + if (paramName == null) { + throw new IllegalStateException("No parameter name specified and not available in class file."); + } + } + return paramName; + } + + + private static class MethodParameterMetadata { + + private final Class type; + + private final String key; + + private final boolean required; + + + MethodParameterMetadata(Class type, String key, boolean required) { + this.type = type; + this.key = key; + this.required = required; + } + + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractSplitter.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractSplitter.java new file mode 100644 index 0000000000..b4ee40ebd1 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractSplitter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageHeaders; + +/** + * @author Mark Fisher + */ +public abstract class AbstractSplitter implements Splitter { + + public List> split(Message message) { + Object result = this.splitMessage(message); + MessageHeaders requestHeaders = message.getHeaders(); + List> results = new ArrayList>(); + if (result instanceof Collection) { + Collection items = (Collection) result; + int sequenceNumber = 0; + int sequenceSize = items.size(); + for (Object item : items) { + results.add(this.createSplitMessage(item, requestHeaders, ++sequenceNumber, sequenceSize)); + } + } + else if (result.getClass().isArray()) { + Object[] items = (Object[]) result; + int sequenceNumber = 0; + int sequenceSize = items.length; + for (Object item : items) { + results.add(this.createSplitMessage(item, requestHeaders, ++sequenceNumber, sequenceSize)); + } + } + else { + results.add(this.createSplitMessage(result, requestHeaders, 1, 1)); + } + if (results.isEmpty()) { + return null; + } + return results; + } + + protected abstract Object splitMessage(Message message); + + private Message createSplitMessage(Object item, MessageHeaders requestHeaders, int sequenceNumber, int sequenceSize) { + if (item instanceof Message) { + return setSplitMessageHeaders(MessageBuilder.fromMessage((Message) item), + requestHeaders.getId(), sequenceNumber, sequenceSize); + } + return setSplitMessageHeaders(MessageBuilder.fromPayload(item), + requestHeaders.getId(), sequenceNumber, sequenceSize); + } + + private Message setSplitMessageHeaders(MessageBuilder builder, Object requestMessageId, int sequenceNumber, int sequenceSize) { + return builder.setCorrelationId(requestMessageId) + .setSequenceNumber(sequenceNumber) + .setSequenceSize(sequenceSize).build(); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/DefaultSplitter.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/DefaultSplitter.java new file mode 100644 index 0000000000..a039830f17 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/DefaultSplitter.java @@ -0,0 +1,55 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + +import org.springframework.integration.message.Message; + +/** + * @author Mark Fisher + */ +public class DefaultSplitter extends AbstractSplitter { + + private volatile String delimiters; + + + /** + * Set delimiters to use for tokenizing String values. The default is + * null indicating that no tokenization should occur. If + * delimiters are provided, they will be applied to any String payload. + */ + public void setDelimiters(String delimiters) { + this.delimiters = delimiters; + } + + public Object splitMessage(Message message) { + Object payload = message.getPayload(); + if (payload instanceof String && this.delimiters != null) { + List tokens = new ArrayList(); + StringTokenizer tokenizer = new StringTokenizer((String) payload, this.delimiters); + while (tokenizer.hasMoreElements()) { + tokens.add(tokenizer.nextToken()); + } + return tokens; + } + return payload; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/MethodInvokingSplitter.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/MethodInvokingSplitter.java new file mode 100644 index 0000000000..302bb1aaa7 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/MethodInvokingSplitter.java @@ -0,0 +1,57 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import java.lang.reflect.Method; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageMappingMethodInvoker; + +/** + * A {@link Splitter} implementation that invokes the specified method + * on the given object. The method's return value will be split if it + * is a Collection or Array. If the return value is not a Collection or + * Array, then the single Object will be returned as the payload of a + * single reply Message. + * + * @author Mark Fisher + */ +public class MethodInvokingSplitter extends AbstractSplitter implements Splitter, InitializingBean { + + private final MessageMappingMethodInvoker invoker; + + + public MethodInvokingSplitter(Object object, Method method) { + this.invoker = new MessageMappingMethodInvoker(object, method); + } + + public MethodInvokingSplitter(Object object, String methodName) { + this.invoker = new MessageMappingMethodInvoker(object, methodName); + } + + + public void afterPropertiesSet() throws Exception { + this.invoker.afterPropertiesSet(); + } + + @Override + protected Object splitMessage(Message message) { + return this.invoker.invokeMethod(message); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/Splitter.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/Splitter.java new file mode 100644 index 0000000000..11c24104c6 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/Splitter.java @@ -0,0 +1,33 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import java.util.List; + +import org.springframework.integration.message.Message; + +/** + * Strategy interface for splitting a single {@link Message} + * into multiple Messages. + * + * @author Mark Fisher + */ +public interface Splitter { + + List> split(Message message); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterEndpoint.java new file mode 100644 index 0000000000..c6e3fe4757 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterEndpoint.java @@ -0,0 +1,96 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import java.util.List; + +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessagingException; +import org.springframework.util.Assert; + +/** + * @author Mark Fisher + */ +public class SplitterEndpoint extends AbstractEndpoint { + + private final Splitter splitter; + + + public SplitterEndpoint(Splitter splitter) { + Assert.notNull(splitter, "splitter must not be null"); + this.splitter = splitter; + } + + + // TODO: move to superclass + private MessageTarget resolveReplyTarget(Object returnAddress) { + MessageTarget replyTarget = this.getTarget(); + if (replyTarget == null && returnAddress != null) { + if (returnAddress instanceof MessageTarget) { + replyTarget = (MessageTarget) returnAddress; + } + else if (returnAddress instanceof String) { + ChannelRegistry channelRegistry = this.getChannelRegistry(); + if (channelRegistry != null) { + replyTarget = channelRegistry.lookupChannel((String) returnAddress); + } + } + } + if (replyTarget == null) { + throw new MessagingException("unable to resolve reply target"); + } + return replyTarget; + } + + @Override + protected boolean sendInternal(Message message) { + List> results = this.splitter.split(message); + if (results != null) { + for (Message splitMessage : results) { + MessageTarget replyTarget = this.resolveReplyTarget(message.getHeaders().getReturnAddress()); + this.getMessageExchangeTemplate().send(splitMessage, replyTarget); + } + return true; + } + return false; + } + + + // TODO: remove these methods after refactoring + + private volatile String inputChannelName; + + public String getInputChannelName() { + return this.inputChannelName; + } + + public void setInputChannelName(String inputChannelName) { + this.inputChannelName = inputChannelName; + } + + public String getOutputChannelName() { + if (this.getTarget() instanceof MessageChannel) { + return ((MessageChannel) this.getTarget()).getName(); + } + return null; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java index 54677102dc..c4c5501dcb 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java @@ -17,17 +17,13 @@ package org.springframework.integration.splitter; import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; -import java.util.StringTokenizer; -import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageHeaders; + +// TODO: remove this class after refactoring the annotation support /** * A {@link MessageHandler} implementation for splitting a single Message @@ -42,102 +38,30 @@ import org.springframework.integration.message.MessageHeaders; * * @author Mark Fisher */ -public class SplitterMessageHandler extends AbstractMessageHandler { +public class SplitterMessageHandler implements MessageHandler { - private volatile String delimiters; + private final Splitter splitter; public SplitterMessageHandler(Object object, Method method) { - super(object, method); + this.splitter = new MethodInvokingSplitter(object, method); } public SplitterMessageHandler(Object object, String methodName) { - super(object, methodName); + this.splitter = new MethodInvokingSplitter(object, methodName); } public SplitterMessageHandler() { - super(); + this.splitter = new DefaultSplitter(); } - /** - * Set delimiters to use for tokenizing String values. The default - * is null indicating that no tokenization should occur. - * If delimiters are provided, they will be applied to any String - * payload, or if an Object and Method have been provided, tokenization - * will be applied to any String return value from the invoked Method. - */ - public void setDelimiters(String delimiters) { - this.delimiters = delimiters; - } - - protected CompositeMessage createReplyMessage(Object result, Message requestMessage) { - MessageHeaders requestHeaders = requestMessage.getHeaders(); - List> results = new ArrayList>(); - if (result instanceof Collection) { - Collection items = (Collection) result; - int sequenceNumber = 0; - int sequenceSize = items.size(); - for (Object item : items) { - results.add(this.createSplitMessage(item, requestHeaders, ++sequenceNumber, sequenceSize)); - } - } - else if (result.getClass().isArray()) { - Object[] items = (Object[]) result; - int sequenceNumber = 0; - int sequenceSize = items.length; - for (Object item : items) { - results.add(this.createSplitMessage(item, requestHeaders, ++sequenceNumber, sequenceSize)); - } - } - else if (result instanceof String && this.delimiters != null) { - StringTokenizer tokenizer = new StringTokenizer((String) result, this.delimiters); - int sequenceNumber = 0; - int sequenceSize = tokenizer.countTokens(); - while (tokenizer.hasMoreElements()) { - results.add(this.createSplitMessage( - tokenizer.nextToken(), requestHeaders, ++sequenceNumber, sequenceSize)); - } - } - else { - results.add(this.createSplitMessage(result, requestHeaders, 1, 1)); - } - if (results.isEmpty()) { + public Message handle(Message message) { + List> results = this.splitter.split(message); + if (results == null || results.isEmpty()) { return null; } return new CompositeMessage(results); } - @Override - protected Message postProcessReplyMessage(Message replyMessage, Message requestMessage) { - Object requestId = requestMessage.getHeaders().getId(); - if (replyMessage instanceof CompositeMessage) { - List> sequentialMessages = new ArrayList>(); - List> replyList = ((CompositeMessage) replyMessage).getPayload(); - int sequenceSize = replyList.size(); - int sequenceNumber = 0; - for (Message message : replyList) { - sequentialMessages.add(this.setSplitMessageHeaders( - MessageBuilder.fromMessage(message), requestId, ++sequenceNumber, sequenceSize)); - } - return new CompositeMessage(sequentialMessages); - } - return this.setSplitMessageHeaders(MessageBuilder.fromMessage(replyMessage), requestId, 1, 1); - } - - private Message createSplitMessage(Object item, MessageHeaders requestHeaders, int sequenceNumber, int sequenceSize) { - if (item instanceof Message) { - return this.setSplitMessageHeaders(MessageBuilder.fromMessage((Message) item), - requestHeaders.getId(), sequenceNumber, sequenceSize); - } - return this.setSplitMessageHeaders(MessageBuilder.fromPayload(item), - requestHeaders.getId(), sequenceNumber, sequenceSize); - } - - private Message setSplitMessageHeaders(MessageBuilder builder, Object requestMessageId, int sequenceNumber, int sequenceSize) { - return builder.setCorrelationId(requestMessageId) - .setSequenceNumber(sequenceNumber) - .setSequenceSize(sequenceSize).build(); - } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java index 2f4968731f..42a8a16ad6 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java @@ -22,11 +22,11 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.endpoint.DefaultEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.splitter.SplitterMessageHandler; +import org.springframework.integration.splitter.MethodInvokingSplitter; +import org.springframework.integration.splitter.SplitterEndpoint; /** * @author Mark Fisher @@ -99,10 +99,10 @@ public class CorrelationIdTests { public void testCorrelationIdWithSplitter() throws Exception { Message message = new StringMessage("test1,test2"); QueueChannel testChannel = new QueueChannel(); - SplitterMessageHandler splitter = new SplitterMessageHandler( + MethodInvokingSplitter splitter = new MethodInvokingSplitter( new TestBean(), TestBean.class.getMethod("split", String.class)); - DefaultEndpoint endpoint = new DefaultEndpoint(splitter); - endpoint.setOutputChannel(testChannel); + SplitterEndpoint endpoint = new SplitterEndpoint(splitter); + endpoint.setTarget(testChannel); splitter.afterPropertiesSet(); endpoint.send(message); Message reply1 = testChannel.receive(100); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/DefaultSplitterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/DefaultSplitterTests.java new file mode 100644 index 0000000000..5ff50fb9b6 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/DefaultSplitterTests.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.splitter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; + +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageBuilder; + +/** + * @author Mark Fisher + */ +public class DefaultSplitterTests { + + @Test + public void splitMessageWithArrayPayload() throws Exception { + String[] payload = new String[] { "x", "y", "z" }; + Message message = MessageBuilder.fromPayload(payload).build(); + DefaultSplitter splitter = new DefaultSplitter(); + List> replies = splitter.split(message); + assertEquals(3, replies.size()); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("x", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("y", reply2.getPayload()); + Message reply3 = replies.get(2); + assertNotNull(reply3); + assertEquals("z", reply3.getPayload()); + } + + @Test + public void splitMessageWithCollectionPayload() throws Exception { + List payload = Arrays.asList(new String[] { "x", "y", "z" }); + Message> message = MessageBuilder.fromPayload(payload).build(); + DefaultSplitter splitter = new DefaultSplitter(); + List> replies = splitter.split(message); + assertEquals(3, replies.size()); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("x", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("y", reply2.getPayload()); + Message reply3 = replies.get(2); + assertNotNull(reply3); + assertEquals("z", reply3.getPayload()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/MethodInvokingSplitterTests.java similarity index 71% rename from org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterMessageHandlerTests.java rename to org.springframework.integration/src/test/java/org/springframework/integration/splitter/MethodInvokingSplitterTests.java index 501c58ad3c..6a94ca1513 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterMessageHandlerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/MethodInvokingSplitterTests.java @@ -27,7 +27,6 @@ import java.util.List; import org.junit.Test; import org.springframework.integration.annotation.Header; -import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; @@ -35,52 +34,16 @@ import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher */ -public class SplitterMessageHandlerTests { +public class MethodInvokingSplitterTests { private SplitterTestBean testBean = new SplitterTestBean(); - @Test - public void splitMessageWithArrayPayload() throws Exception { - String[] payload = new String[] { "x", "y", "z" }; - Message message = MessageBuilder.fromPayload(payload).build(); - SplitterMessageHandler handler = new SplitterMessageHandler(); - List> replies = invokeHandler(handler, message); - assertEquals(3, replies.size()); - Message reply1 = replies.get(0); - assertNotNull(reply1); - assertEquals("x", reply1.getPayload()); - Message reply2 = replies.get(1); - assertNotNull(reply2); - assertEquals("y", reply2.getPayload()); - Message reply3 = replies.get(2); - assertNotNull(reply3); - assertEquals("z", reply3.getPayload()); - } - - @Test - public void splitMessageWithCollectionPayload() throws Exception { - List payload = Arrays.asList(new String[] { "x", "y", "z" }); - Message> message = MessageBuilder.fromPayload(payload).build(); - SplitterMessageHandler handler = new SplitterMessageHandler(); - List> replies = invokeHandler(handler, message); - assertEquals(3, replies.size()); - Message reply1 = replies.get(0); - assertNotNull(reply1); - assertEquals("x", reply1.getPayload()); - Message reply2 = replies.get(1); - assertNotNull(reply2); - assertEquals("y", reply2.getPayload()); - Message reply3 = replies.get(2); - assertNotNull(reply3); - assertEquals("z", reply3.getPayload()); - } - @Test public void splitStringToStringArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("stringToStringArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("stringToStringArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -92,8 +55,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToStringList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("stringToStringList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("stringToStringList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -105,8 +68,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToStringArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("messageToStringArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("messageToStringArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -118,8 +81,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToStringList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("messageToStringList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("messageToStringList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -131,8 +94,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToMessageArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("messageToMessageArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("messageToMessageArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -144,8 +107,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToMessageList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("messageToMessageList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("messageToMessageList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -157,8 +120,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToMessageArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("stringToMessageArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("stringToMessageArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -170,8 +133,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToMessageList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("stringToMessageList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("stringToMessageList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -183,8 +146,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToStringArrayConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToStringArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToStringArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -196,8 +159,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToStringListConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToStringList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToStringList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -209,8 +172,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToStringArrayConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToStringArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToStringArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -222,8 +185,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToStringListConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToStringList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToStringList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -235,8 +198,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToMessageArrayConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToMessageArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToMessageArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -248,8 +211,8 @@ public class SplitterMessageHandlerTests { @Test public void splitMessageToMessageListConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToMessageList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "messageToMessageList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -261,8 +224,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToMessageArrayConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToMessageArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToMessageArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -274,8 +237,8 @@ public class SplitterMessageHandlerTests { @Test public void splitStringToMessageListConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToMessageList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, "stringToMessageList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -287,8 +250,8 @@ public class SplitterMessageHandlerTests { @Test public void testHeaderForObjectReturnValues() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("stringToStringArray"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("stringToStringArray"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals(new Integer(2), reply1.getHeaders().getSequenceSize()); @@ -304,8 +267,8 @@ public class SplitterMessageHandlerTests { @Test public void testHeaderForMessageReturnValues() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandler handler = this.getHandler("messageToMessageList"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("messageToMessageList"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals(new Integer(2), reply1.getHeaders().getSequenceSize()); @@ -322,8 +285,8 @@ public class SplitterMessageHandlerTests { public void splitMessageHeader() throws Exception { Message message = MessageBuilder.fromPayload("ignored") .setHeader("testHeader", "foo.bar").build(); - SplitterMessageHandler handler = this.getHandler("splitHeader"); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = this.getSplitter("splitHeader"); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); @@ -337,8 +300,8 @@ public class SplitterMessageHandlerTests { Message message = MessageBuilder.fromPayload("a.b") .setHeader("testHeader", "c.d").build(); Method splittingMethod = this.testBean.getClass().getMethod("splitPayloadAndHeader", String.class, String.class); - SplitterMessageHandler handler = new SplitterMessageHandler(testBean, splittingMethod); - List> replies = invokeHandler(handler, message); + MethodInvokingSplitter splitter = new MethodInvokingSplitter(testBean, splittingMethod); + List> replies = splitter.split(message); Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("a", reply1.getPayload()); @@ -354,14 +317,10 @@ public class SplitterMessageHandlerTests { } - private SplitterMessageHandler getHandler(String methodName) throws Exception { + private MethodInvokingSplitter getSplitter(String methodName) throws Exception { Class paramType = methodName.startsWith("message") ? Message.class : String.class; Method splittingMethod = this.testBean.getClass().getMethod(methodName, paramType); - return new SplitterMessageHandler(testBean, splittingMethod); - } - - private static List> invokeHandler(SplitterMessageHandler handler, Message message) { - return ((CompositeMessage) handler.handle(message)).getPayload(); + return new MethodInvokingSplitter(testBean, splittingMethod); }