diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AnnotationMethodPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AnnotationMethodPostProcessor.java index c28e086ddf..ef842de69b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AnnotationMethodPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AnnotationMethodPostProcessor.java @@ -16,7 +16,7 @@ package org.springframework.integration.config.annotation; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; /** * Strategy interface for post-processing annotated methods. @@ -27,7 +27,7 @@ public interface AnnotationMethodPostProcessor { Object postProcess(Object bean, String beanName, Class originalBeanClass); - AbstractEndpoint createEndpoint(Object bean, String beanName, Class originalBeanClass, + MessageEndpoint createEndpoint(Object bean, String beanName, Class originalBeanClass, org.springframework.integration.annotation.MessageEndpoint endpointAnnotation); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java index 2ea4e73ba8..097cf02b13 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java @@ -37,9 +37,9 @@ import org.springframework.integration.annotation.Splitter; import org.springframework.integration.annotation.Transformer; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.ChannelRegistryAware; -import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.ConcurrencyPolicy; -import org.springframework.integration.endpoint.HandlerEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.SimpleEndpoint; import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.MessageHandlerChain; @@ -129,9 +129,9 @@ public class HandlerAnnotationPostProcessor extends AbstractAnnotationMethodPost return handlerChain; } - public AbstractEndpoint createEndpoint(Object bean, String beanName, Class originalBeanClass, + public MessageEndpoint createEndpoint(Object bean, String beanName, Class originalBeanClass, org.springframework.integration.annotation.MessageEndpoint endpointAnnotation) { - HandlerEndpoint endpoint = new HandlerEndpoint((MessageHandler) bean); + SimpleEndpoint endpoint = new SimpleEndpoint((MessageHandler) bean); String outputChannelName = endpointAnnotation.output(); if (StringUtils.hasText(outputChannelName)) { endpoint.setOutputChannelName(outputChannelName); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java index 8805bb5fd1..5add354fc5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessor.java @@ -33,7 +33,6 @@ import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.dispatcher.PollingDispatcher; -import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; @@ -87,10 +86,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Init AnnotationMethodPostProcessor postProcessor = entry.getValue(); bean = postProcessor.postProcess(bean, beanName, beanClass); if (endpointAnnotation != null && entry.getKey().isAssignableFrom(bean.getClass())) { - AbstractEndpoint endpoint = + org.springframework.integration.endpoint.MessageEndpoint endpoint = postProcessor.createEndpoint(bean, beanName, beanClass, endpointAnnotation); if (endpoint != null) { - endpoint.setName(beanName + "." + entry.getKey().getSimpleName() + ".endpoint"); + endpoint.setBeanName(beanName + "." + entry.getKey().getSimpleName() + ".endpoint"); Poller pollerAnnotation = AnnotationUtils.findAnnotation(beanClass, Poller.class); if (pollerAnnotation != null) { PollingSchedule schedule = new PollingSchedule(pollerAnnotation.period()); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index 0c3911d6f6..de2e86c566 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -16,6 +16,7 @@ package org.springframework.integration.endpoint; +import org.springframework.beans.factory.BeanNameAware; import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.scheduling.Schedule; @@ -25,10 +26,12 @@ import org.springframework.integration.scheduling.Schedule; * * @author Mark Fisher */ -public interface MessageEndpoint extends MessageTarget { +public interface MessageEndpoint extends MessageTarget, BeanNameAware { String getName(); + void setSchedule(Schedule schedule); + Schedule getSchedule(); void setSource(MessageSource source); @@ -39,6 +42,8 @@ public interface MessageEndpoint extends MessageTarget { MessageTarget getTarget(); + void setInputChannelName(String inputChannelName); + String getInputChannelName(); String getOutputChannelName(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SimpleEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SimpleEndpoint.java index ab1cd214ba..a2832ed717 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SimpleEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SimpleEndpoint.java @@ -332,9 +332,10 @@ public class SimpleEndpoint implements MessageEndpoint /* TODO: remove the following methods after they are removed from the MessageEndpoint interface. */ - private String inputChannelName; - private String outputChannelName; - private MessageSource source; + private volatile String inputChannelName; + private volatile String outputChannelName; + private volatile MessageSource source; + private volatile Schedule schedule; public String getInputChannelName() { return this.inputChannelName; @@ -356,7 +357,11 @@ public class SimpleEndpoint implements MessageEndpoint } public Schedule getSchedule() { - return null; + return this.schedule; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; } public MessageSource getSource() { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java index e9c96fca66..eb45917c9a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java @@ -181,8 +181,8 @@ public abstract class AbstractMessageHandler implements MessageHandler, Initiali } } - public Message handle(Message message) { - if (message == null || message.getPayload() == null) { + public Message handle(Message requestMessage) { + if (requestMessage == null || requestMessage.getPayload() == null) { if (logger.isDebugEnabled()) { logger.debug("message handler received a null message"); } @@ -191,21 +191,22 @@ public abstract class AbstractMessageHandler implements MessageHandler, Initiali if (!this.initialized) { this.afterPropertiesSet(); } - Object result = (this.invoker != null) ? this.invokeHandlerMethod(message) : message.getPayload(); + Object result = (this.invoker != null) ? + this.invokeHandlerMethod(requestMessage) : requestMessage.getPayload(); if (result == null) { return null; } - return this.createReplyMessage(result, message.getHeaders()); + return this.createReplyMessage(result, requestMessage); } /** * Subclasses must implement this method to generate the reply Message. * * @param result the return value from an adapter method, or the Message payload if not acting as an adapter - * @param requestHeaders the MessageHeaders of the original request Message + * @param requestMessage the original request Message * @return the Message to be sent to the reply MessageTarget */ - protected abstract Message createReplyMessage(Object result, MessageHeaders requestHeaders); + protected abstract Message createReplyMessage(Object result, Message requestMessage); private Object invokeHandlerMethod(Message message) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DefaultMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DefaultMessageHandler.java index bf7778ddf7..3302845c03 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DefaultMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DefaultMessageHandler.java @@ -18,7 +18,6 @@ package org.springframework.integration.handler; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageHeaders; /** * The default MessageHandler implementation. Creates a Message for the reply payload. @@ -30,8 +29,11 @@ import org.springframework.integration.message.MessageHeaders; public class DefaultMessageHandler extends AbstractMessageHandler { @Override - protected Message createReplyMessage(Object result, MessageHeaders requestHeaders) { - return MessageBuilder.fromPayload(result).copyHeaders(requestHeaders).setCorrelationId(requestHeaders.getId()).build(); + protected Message createReplyMessage(Object result, Message requestMessage) { + return MessageBuilder.fromPayload(result) + .copyHeaders(requestMessage.getHeaders()) + .setCorrelationId(requestMessage.getHeaders().getId()) + .build(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/SplitterMessageHandlerAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/SplitterMessageHandlerAdapter.java deleted file mode 100644 index cb30f6d79a..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/SplitterMessageHandlerAdapter.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.router; - -import java.lang.reflect.Method; -import java.util.Collection; - -import org.springframework.integration.ConfigurationException; -import org.springframework.integration.annotation.Splitter; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.ChannelRegistryAware; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.handler.AbstractMessageHandlerAdapter; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; - -/** - * MessageHandler adapter for methods annotated with {@link Splitter @Splitter}. - * - * @author Mark Fisher - * @author Marius Bogoevici - */ -public class SplitterMessageHandlerAdapter extends AbstractMessageHandlerAdapter implements ChannelRegistryAware { - - private volatile String outputChannelName; - - private volatile long sendTimeout = -1; - - - public SplitterMessageHandlerAdapter(Object object, Method method) { - this.setObject(object); - this.setMethod(method); - if (method.getParameterTypes().length < 1) { - throw new ConfigurationException("The splitter method must accept at least one argument."); - } - if (method.getParameterTypes()[0].equals(Message.class)) { - this.setMethodExpectsMessage(true); - } - } - - public SplitterMessageHandlerAdapter(Object object, String methodName) { - this.setObject(object); - this.setMethodName(methodName); - } - - public SplitterMessageHandlerAdapter() { - } - - - public void setOutputChannelName(String outputChannelName) { - this.outputChannelName = outputChannelName; - } - - public void setSendTimeout(long sendTimeout) { - this.sendTimeout = sendTimeout; - } - - @Override - protected final Message handleReturnValue(Object returnValue, Message originalMessage) { - if (returnValue == null) { - if (logger.isWarnEnabled()) { - logger.warn("Splitter method returned null."); - } - return null; - } - if (returnValue instanceof Collection) { - Collection items = (Collection) returnValue; - int sequenceNumber = 0; - int sequenceSize = items.size(); - for (Object item : items) { - Message splitMessage = (item instanceof Message) ? - (Message) item : this.createReplyMessage(item, originalMessage); - splitMessage = MessageBuilder.fromMessage(splitMessage) - .setCorrelationId(originalMessage.getHeaders().getId()) - .setSequenceNumber(++sequenceNumber) - .setSequenceSize(sequenceSize).build(); - this.sendMessage(splitMessage, this.outputChannelName); - } - } - else if (returnValue.getClass().isArray()) { - Object[] array = (Object[]) returnValue; - int sequenceNumber = 0; - int sequenceSize = array.length; - for (Object item : array) { - Message splitMessage = (item instanceof Message) ? - (Message) item : this.createReplyMessage(item, originalMessage); - splitMessage = MessageBuilder.fromMessage(splitMessage) - .setCorrelationId(originalMessage.getHeaders().getId()) - .setSequenceNumber(++sequenceNumber) - .setSequenceSize(sequenceSize).build(); - this.sendMessage(splitMessage, this.outputChannelName); - } - } - else { - throw new ConfigurationException( - "splitter method must return either a Collection or array"); - } - return null; - } - - private boolean sendMessage(Message message, String channelName) { - ChannelRegistry channelRegistry = this.getChannelRegistry(); - if (channelRegistry == null) { - throw new IllegalStateException(this.getClass().getSimpleName() + " requires a ChannelRegistry reference."); - } - MessageChannel channel = channelRegistry.lookupChannel(channelName); - if (channel == null) { - if (logger.isWarnEnabled()) { - logger.warn("unable to resolve channel for name '" + channelName + "'"); - } - return false; - } - if (logger.isDebugEnabled()) { - logger.debug("sending message to channel '" + channelName + "'"); - } - return (this.sendTimeout < 0) ? channel.send(message) : channel.send(message, this.sendTimeout); - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/SplitterMessageHandlerCreator.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/SplitterMessageHandlerCreator.java index 596449df3c..7a1fd75453 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/config/SplitterMessageHandlerCreator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/config/SplitterMessageHandlerCreator.java @@ -25,7 +25,7 @@ import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.handler.AbstractMessageHandlerAdapter; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.config.AbstractMessageHandlerCreator; -import org.springframework.integration.router.SplitterMessageHandlerAdapter; +import org.springframework.integration.splitter.SplitterMessageHandler; /** * Creates a {@link MessageHandler} adapter for splitter methods. @@ -42,9 +42,7 @@ public class SplitterMessageHandlerCreator extends AbstractMessageHandlerCreator outputChannelName = endpointAnnotation.output(); } } - SplitterMessageHandlerAdapter adapter = new SplitterMessageHandlerAdapter(object, method); - adapter.setOutputChannelName(outputChannelName); - return adapter; + return new SplitterMessageHandler(object, method); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java index 2a10f08d48..24c6c12ac4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/SplitterMessageHandler.java @@ -71,8 +71,9 @@ public class SplitterMessageHandler extends AbstractMessageHandler { this.delimiters = delimiters; } - protected CompositeMessage createReplyMessage(Object result, MessageHeaders requestHeaders) { - List> results = new ArrayList>(); + protected CompositeMessage createReplyMessage(Object result, Message requestMessage) { + MessageHeaders requestHeaders = requestMessage.getHeaders(); + List> results = new ArrayList>(); if (result instanceof Collection) { Collection items = (Collection) result; int sequenceNumber = 0; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index 5302aa2fa0..bbabd8b22b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -30,7 +30,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.config.MessageBusParser; -import org.springframework.integration.endpoint.HandlerEndpoint; +import org.springframework.integration.endpoint.SimpleEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.router.AggregatingMessageHandler; import org.springframework.integration.router.CompletionStrategyAdapter; @@ -111,8 +111,8 @@ public class AggregatorAnnotationTests { @SuppressWarnings("unchecked") private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) { MessageBus messageBus = this.getMessageBus(context); - HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint(endpointName + ".MessageHandler.endpoint"); - MessageHandler handler = endpoint.getHandler(); + SimpleEndpoint endpoint = (SimpleEndpoint) messageBus.lookupEndpoint(endpointName + ".MessageHandler.endpoint"); + MessageHandler handler = (MessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler"); try { if (AopUtils.isAopProxy(handler)) { DelegatingIntroductionInterceptor interceptor = (DelegatingIntroductionInterceptor) @@ -124,7 +124,7 @@ public class AggregatorAnnotationTests { catch (Exception e) { // will return the accessor for the handler } - return new DirectFieldAccessor(endpoint.getHandler()); + return new DirectFieldAccessor(handler); } private MessageBus getMessageBus(ApplicationContext context) { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index e9d75e557a..711dc252c5 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -56,7 +56,7 @@ import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.EndpointInterceptor; -import org.springframework.integration.endpoint.HandlerEndpoint; +import org.springframework.integration.endpoint.SimpleEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -185,7 +185,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); - HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); + SimpleEndpoint endpoint = (SimpleEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); List interceptors = (List) accessor.getPropertyValue("interceptors"); assertEquals(1, interceptors.size()); @@ -363,7 +363,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation(); postProcessor.postProcessAfterInitialization(endpoint, "testBean"); - HandlerEndpoint processedEndpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); + SimpleEndpoint processedEndpoint = (SimpleEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); Schedule schedule = processedEndpoint.getSchedule(); assertEquals(PollingSchedule.class, schedule.getClass()); PollingSchedule pollingSchedule = (PollingSchedule) schedule; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java index 3c3bf1b885..1d7f0cdba4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java @@ -21,13 +21,12 @@ import static org.junit.Assert.assertTrue; import org.junit.Test; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.SimpleEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.router.SplitterMessageHandlerAdapter; +import org.springframework.integration.splitter.SplitterMessageHandler; /** * @author Mark Fisher @@ -112,19 +111,13 @@ public class CorrelationIdTests { @Test public void testCorrelationIdWithSplitter() throws Exception { Message message = new StringMessage("test1,test2"); - DefaultMessageHandlerAdapter adapter = new DefaultMessageHandlerAdapter(); - adapter.setObject(new TestBean()); - adapter.setMethodName("upperCase"); - adapter.afterPropertiesSet(); QueueChannel testChannel = new QueueChannel(); - ChannelRegistry channelRegistry = new DefaultChannelRegistry(); - channelRegistry.registerChannel("testChannel", testChannel); - SplitterMessageHandlerAdapter splitter = new SplitterMessageHandlerAdapter( + SplitterMessageHandler splitter = new SplitterMessageHandler( new TestBean(), TestBean.class.getMethod("split", String.class)); - splitter.setOutputChannelName("testChannel"); - splitter.setChannelRegistry(channelRegistry); + SimpleEndpoint endpoint = new SimpleEndpoint(splitter); + endpoint.setOutputChannel(testChannel); splitter.afterPropertiesSet(); - splitter.handle(message); + endpoint.send(message); Message reply1 = testChannel.receive(100); Message reply2 = testChannel.receive(100); assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerAdapterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerTests.java similarity index 50% rename from org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerAdapterTests.java rename to org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerTests.java index 1d7384ac94..e1a457e9c2 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerAdapterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/SplitterMessageHandlerTests.java @@ -26,192 +26,223 @@ import java.util.List; import org.junit.Test; -import org.springframework.integration.channel.ChannelRegistry; -import org.springframework.integration.channel.DefaultChannelRegistry; -import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; +import org.springframework.integration.splitter.SplitterMessageHandler; /** * @author Mark Fisher */ -public class SplitterMessageHandlerAdapterTests { - - private QueueChannel testChannel = new QueueChannel(); - - private ChannelRegistry channelRegistry = new DefaultChannelRegistry(); +public class SplitterMessageHandlerTests { private SplitterTestBean testBean = new SplitterTestBean(); - public SplitterMessageHandlerAdapterTests() { - this.channelRegistry.registerChannel("testChannel", testChannel); - } - - @Test - public void testSplitPayloadToStringArray() throws Exception { + public void splitStringToStringArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("stringToStringArray"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("stringToStringArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitPayloadToStringList() throws Exception { + public void splitStringToStringList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("stringToStringList"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("stringToStringList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitMessageToStringArray() throws Exception { + public void splitMessageToStringArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("messageToStringArray"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("messageToStringArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitMessageToStringList() throws Exception { + public void splitMessageToStringList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("messageToStringList"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("messageToStringList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitMessageToMessageArray() throws Exception { + public void splitMessageToMessageArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("messageToMessageArray"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("messageToMessageArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitMessageToMessageList() throws Exception { + public void splitMessageToMessageList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("messageToMessageList"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("messageToMessageList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitStringToMessageArray() throws Exception { + public void splitStringToMessageArray() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("stringToMessageArray"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("stringToMessageArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitStringToMessageList() throws Exception { + public void splitStringToMessageList() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("stringToMessageList"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("stringToMessageList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); - assertNotNull(reply2); - assertEquals("bar", reply2.getPayload()); - } - - @Test(expected=MessagingException.class) - public void testInvalidReturnType() throws Exception { - Method splittingMethod = this.testBean.getClass().getMethod("invalidParameterCount", String.class, String.class); - SplitterMessageHandlerAdapter adapter = new SplitterMessageHandlerAdapter(testBean, splittingMethod); - adapter.setOutputChannelName("testChannel"); - adapter.setChannelRegistry(channelRegistry); - adapter.afterPropertiesSet(); - StringMessage message = new StringMessage("foo.bar"); - adapter.handle(message); - } - - @Test - public void testSplitPayloadToStringArrayConfiguredByMethodName() throws Exception { - StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = - new SplitterMessageHandlerAdapter(testBean, "stringToStringArray"); - adapter.setOutputChannelName("testChannel"); - adapter.setChannelRegistry(channelRegistry); - adapter.afterPropertiesSet(); - adapter.handle(message); - Message reply1 = testChannel.receive(0); - assertNotNull(reply1); - assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitMessageToStringArrayConfiguredByMethodName() throws Exception { + public void splitStringToStringArrayConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = - new SplitterMessageHandlerAdapter(testBean, "messageToStringArray"); - adapter.setOutputChannelName("testChannel"); - adapter.setChannelRegistry(channelRegistry); - adapter.afterPropertiesSet(); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToStringArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @Test - public void testSplitStringToMessageListConfiguredByMethodName() throws Exception { + public void splitStringToStringListConfiguredByMethodName() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = - new SplitterMessageHandlerAdapter(testBean, "stringToMessageList"); - adapter.setOutputChannelName("testChannel"); - adapter.setChannelRegistry(channelRegistry); - adapter.afterPropertiesSet(); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToStringList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals("foo", reply1.getPayload()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitMessageToStringArrayConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToStringArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitMessageToStringListConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToStringList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitMessageToMessageArrayConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToMessageArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitMessageToMessageListConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "messageToMessageList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitStringToMessageArrayConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToMessageArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); + assertNotNull(reply2); + assertEquals("bar", reply2.getPayload()); + } + + @Test + public void splitStringToMessageListConfiguredByMethodName() throws Exception { + StringMessage message = new StringMessage("foo.bar"); + SplitterMessageHandler handler = new SplitterMessageHandler(testBean, "stringToMessageList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); + assertNotNull(reply1); + assertEquals("foo", reply1.getPayload()); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals("bar", reply2.getPayload()); } @@ -219,14 +250,14 @@ public class SplitterMessageHandlerAdapterTests { @Test public void testHeaderForObjectReturnValues() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("stringToStringArray"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("stringToStringArray"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals(new Integer(2), reply1.getHeaders().getSequenceSize()); assertEquals(new Integer(1), reply1.getHeaders().getSequenceNumber()); assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals(new Integer(2), reply2.getHeaders().getSequenceSize()); assertEquals(new Integer(2), reply2.getHeaders().getSequenceNumber()); @@ -236,14 +267,14 @@ public class SplitterMessageHandlerAdapterTests { @Test public void testHeaderForMessageReturnValues() throws Exception { StringMessage message = new StringMessage("foo.bar"); - SplitterMessageHandlerAdapter adapter = this.getAdapter("messageToMessageList"); - adapter.handle(message); - Message reply1 = testChannel.receive(0); + SplitterMessageHandler handler = this.getHandler("messageToMessageList"); + List> replies = invokeHandler(handler, message); + Message reply1 = replies.get(0); assertNotNull(reply1); assertEquals(new Integer(2), reply1.getHeaders().getSequenceSize()); assertEquals(new Integer(1), reply1.getHeaders().getSequenceNumber()); assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId()); - Message reply2 = testChannel.receive(0); + Message reply2 = replies.get(1); assertNotNull(reply2); assertEquals(new Integer(2), reply2.getHeaders().getSequenceSize()); assertEquals(new Integer(2), reply2.getHeaders().getSequenceNumber()); @@ -251,15 +282,14 @@ public class SplitterMessageHandlerAdapterTests { } - private SplitterMessageHandlerAdapter getAdapter(String methodName) throws Exception { + private SplitterMessageHandler getHandler(String methodName) throws Exception { Class paramType = methodName.startsWith("message") ? Message.class : String.class; Method splittingMethod = this.testBean.getClass().getMethod(methodName, paramType); - SplitterMessageHandlerAdapter adapter = - new SplitterMessageHandlerAdapter(testBean, splittingMethod); - adapter.setOutputChannelName("testChannel"); - adapter.setChannelRegistry(channelRegistry); - adapter.afterPropertiesSet(); - return adapter; + return new SplitterMessageHandler(testBean, splittingMethod); + } + + private static List> invokeHandler(SplitterMessageHandler handler, Message message) { + return ((CompositeMessage) handler.handle(message)).getPayload(); } @@ -316,10 +346,6 @@ public class SplitterMessageHandlerAdapterTests { } return messages; } - - public String[] invalidParameterCount(String param1, String param2) { - return null; - } } }