From 27e288be08d4ccdf4bbb3e6a5bb3776963b1dfed Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Mon, 6 Oct 2008 17:24:46 +0000 Subject: [PATCH] Refactored existing Message-consuming endpoints to only implement MessageConsumer (not MessageEndpoint). Now, either a PollingConsumerEndpoint or SubscribingConsumerEndpoint delegates to the MessageConsumer thereby separating the Lifecycle responsibilities and configuration settings (trigger, transactions, etc) since they are different for polling vs. subscribing and not relevant for simply consuming Messages. Essentially all MessageConsumers are now "event-driven" since a "polling consumer" is actually handled by the PollingConsumerEndpoint class. The next refactoring step involves renaming several components to clarify this endpoint vs. consumer distinction. --- .../AbstractRemotingOutboundGateway.java | 4 - .../config/AbstractRemotingGatewayParser.java | 5 +- ...AbstractRemotingOutboundGatewayParser.java | 48 +++++- ...FileOutboundChannelAdapterParserTests.java | 6 +- .../HttpInvokerOutboundGatewayParser.java | 2 +- ...HttpInvokerOutboundGatewayParserTests.java | 6 +- .../rmi/config/RmiOutboundGatewayParser.java | 16 +- .../config/RmiOutboundGatewayParserTests.java | 5 +- .../ws/config/WebServiceHandlerParser.java | 43 ++---- .../config/WebServiceHandlerParserTests.java | 87 +++++++---- .../AbstractMessageBarrierEndpoint.java | 54 ++++--- .../config/AbstractEndpointParser.java | 97 +++++------- .../AbstractOutboundChannelAdapterParser.java | 16 +- .../integration/config/AggregatorParser.java | 49 +++--- .../integration/config/FilterParser.java | 16 +- .../integration/config/ResequencerParser.java | 43 +++--- .../integration/config/RouterParser.java | 18 +-- .../config/ServiceActivatorParser.java | 17 ++- .../integration/config/SplitterParser.java | 22 ++- .../integration/config/TransformerParser.java | 16 +- ...AbstractMethodAnnotationPostProcessor.java | 82 +++++++---- .../AggregatorAnnotationPostProcessor.java | 46 ++---- ...ChannelAdapterAnnotationPostProcessor.java | 17 ++- .../RouterAnnotationPostProcessor.java | 24 +-- ...rviceActivatorAnnotationPostProcessor.java | 16 +- .../SplitterAnnotationPostProcessor.java | 15 +- .../TransformerAnnotationPostProcessor.java | 15 +- .../AbstractMessageConsumingEndpoint.java | 139 ++---------------- .../AbstractMessageHandlingEndpoint.java | 16 +- .../endpoint/AbstractPollingEndpoint.java | 5 +- .../endpoint/ServiceActivatorEndpoint.java | 6 +- .../config/ConsumerEndpointFactoryBean.java | 14 +- .../gateway/AbstractMessagingGateway.java | 29 +++- .../integration/router/RouterEndpoint.java | 9 +- .../config/AbstractTransformerParser.java | 15 +- .../aggregator/AggregatorEndpointTests.java | 4 +- .../aggregator/ResequencerEndpointTests.java | 4 +- .../bus/DefaultMessageBusTests.java | 45 +++--- .../bus/DirectChannelSubscriptionTests.java | 18 +-- .../integration/bus/messageBusTests.xml | 8 +- .../channel/MessageChannelTemplateTests.java | 7 +- .../config/AggregatorParserTests.java | 39 ++--- .../config/ChannelAdapterParserTests.java | 6 +- .../config/EndpointParserTests.java | 9 +- .../config/ResequencerParserTests.java | 10 +- .../config/aggregatorParserTests.xml | 13 +- .../annotation/AggregatorAnnotationTests.java | 6 +- ...MessagingAnnotationPostProcessorTests.java | 16 +- .../config/endpointWithSelector.xml | 4 +- .../endpoint/CorrelationIdTests.java | 30 ++-- ...ServiceActivatorMethodResolutionTests.java | 14 +- .../filter/FilterEndpointTests.java | 9 +- .../message/MethodInvokingConsumerTests.java | 6 +- .../SplitterIntegrationTests-context.xml | 7 +- 54 files changed, 591 insertions(+), 682 deletions(-) diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java index bf823a2725..b4db69d314 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java @@ -39,10 +39,6 @@ public abstract class AbstractRemotingOutboundGateway extends AbstractMessageHan } - public void setRequestChannel(MessageChannel requestChannel) { - this.setInputChannel(requestChannel); - } - public void setReplyChannel(MessageChannel replyChannel) { this.setOutputChannel(replyChannel); } diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java index d1d6bc6537..73945fe5f0 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -33,7 +34,9 @@ import org.springframework.util.StringUtils; */ public abstract class AbstractRemotingGatewayParser extends AbstractSimpleBeanDefinitionParser { - protected abstract Class getBeanClass(Element element); + protected Class getBeanClass(Element element) { + return ConsumerEndpointFactoryBean.class; + } @Override diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java index ddf3f52815..b07dd5d7c7 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java @@ -18,26 +18,64 @@ package org.springframework.integration.adapter.config; import org.w3c.dom.Element; +import org.springframework.beans.factory.BeanDefinitionStoreException; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.AbstractEndpointParser; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * Base class for url-based remoting outbound gateway parsers. * * @author Mark Fisher */ -public abstract class AbstractRemotingOutboundGatewayParser extends AbstractRemotingGatewayParser { +public abstract class AbstractRemotingOutboundGatewayParser extends AbstractEndpointParser { + + protected abstract Class getGatewayClass(Element element); @Override - protected boolean isEligibleAttribute(String attributeName) { - return !attributeName.equals("url") && super.isEligibleAttribute(attributeName); + protected String getInputChannelAttributeName() { + return "request-channel"; } @Override - protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { + protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) + throws BeanDefinitionStoreException { + String id = super.resolveId(element, definition, parserContext); + if (!StringUtils.hasText(id)) { + id = element.getAttribute("name"); + } + if (!StringUtils.hasText(id)) { + id = parserContext.getReaderContext().generateBeanName(definition); + } + return id; + } + + @Override + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(this.getGatewayClass(element)); + String url = this.parseUrl(element); + builder.addConstructorArgValue(url); + String replyChannel = element.getAttribute("reply-channel"); + if (StringUtils.hasText(replyChannel)) { + builder.addPropertyReference("replyChannel", replyChannel); + } + this.postProcessGateway(builder, element); + return builder; + } + + protected String parseUrl(Element element) { String url = element.getAttribute("url"); Assert.hasText(url, "The 'url' attribute is required."); - builder.addConstructorArgValue(url); + return url; + } + + /** + * Subclasses may override this method for additional configuration. + */ + protected void postProcessGateway(BeanDefinitionBuilder builder, Element element) { } } diff --git a/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java b/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java index 2fb35dd2b9..208481e1c4 100644 --- a/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java +++ b/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java @@ -27,7 +27,7 @@ import org.junit.runner.RunWith; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.file.DefaultFileNameGenerator; import org.springframework.integration.file.FileWritingMessageConsumer; import org.springframework.test.context.ContextConfiguration; @@ -43,11 +43,11 @@ public class FileOutboundChannelAdapterParserTests { @Autowired @Qualifier("simpleAdapter") - OutboundChannelAdapter simpleAdapter; + SubscribingConsumerEndpoint simpleAdapter; @Autowired @Qualifier("adapterWithCustomNameGenerator") - OutboundChannelAdapter adapterWithCustomNameGenerator; + SubscribingConsumerEndpoint adapterWithCustomNameGenerator; @Test diff --git a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java index 17bcd4e806..9941798051 100644 --- a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java +++ b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java @@ -29,7 +29,7 @@ import org.springframework.integration.httpinvoker.HttpInvokerOutboundGateway; public class HttpInvokerOutboundGatewayParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return HttpInvokerOutboundGateway.class; } diff --git a/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java b/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java index 89191765bb..838bde8d01 100644 --- a/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java +++ b/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.httpinvoker.HttpInvokerOutboundGateway; /** @@ -33,7 +35,9 @@ public class HttpInvokerOutboundGatewayParserTests { public void testHttpInvokerOutboundGatewayParser() { ApplicationContext context = new ClassPathXmlApplicationContext( "httpInvokerOutboundGatewayParserTests.xml", this.getClass()); - Object gateway = context.getBean("gateway"); + Object endpoint = context.getBean("gateway"); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); assertEquals(HttpInvokerOutboundGateway.class, gateway.getClass()); } diff --git a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java index d0ffb58ae5..afe8a84215 100644 --- a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java +++ b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java @@ -20,7 +20,6 @@ import java.rmi.registry.Registry; import org.w3c.dom.Element; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.integration.adapter.config.AbstractRemotingOutboundGatewayParser; import org.springframework.integration.rmi.RmiInboundGateway; import org.springframework.integration.rmi.RmiOutboundGateway; @@ -35,28 +34,19 @@ import org.springframework.util.StringUtils; public class RmiOutboundGatewayParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return RmiOutboundGateway.class; } @Override - protected boolean isEligibleAttribute(String attributeName) { - return !"host".equals(attributeName) - && !"port".equals(attributeName) - && !"remote-channel".equals(attributeName) - && super.isEligibleAttribute(attributeName); - } - - @Override - protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { + protected String parseUrl(Element element) { String host = element.getAttribute("host"); String remoteChannel = element.getAttribute("remote-channel"); Assert.isTrue(StringUtils.hasText(host) && StringUtils.hasText(remoteChannel), "The 'host' and 'remote-channel' attributes are both required"); String portAttribute = element.getAttribute("port"); String port = StringUtils.hasText(portAttribute) ? portAttribute : "" + Registry.REGISTRY_PORT; - String url = "rmi://" + host + ":" + port + "/" + RmiInboundGateway.SERVICE_NAME_PREFIX + remoteChannel; - builder.addConstructorArgValue(url); + return "rmi://" + host + ":" + port + "/" + RmiInboundGateway.SERVICE_NAME_PREFIX + remoteChannel; } } diff --git a/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java b/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java index 9a674e4272..af59956052 100644 --- a/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java +++ b/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java @@ -29,7 +29,6 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; import org.springframework.integration.rmi.RmiInboundGateway; -import org.springframework.integration.rmi.RmiOutboundGateway; /** * @author Mark Fisher @@ -53,8 +52,8 @@ public class RmiOutboundGatewayParserTests { public void directInvocation() { ApplicationContext context = new ClassPathXmlApplicationContext( "rmiOutboundGatewayParserTests.xml", this.getClass()); - RmiOutboundGateway gateway = (RmiOutboundGateway) context.getBean("gateway"); - gateway.handle(new StringMessage("test")); + MessageChannel localChannel = (MessageChannel) context.getBean("localChannel"); + localChannel.send(new StringMessage("test")); Message result = testChannel.receive(1000); assertNotNull(result); assertEquals("test", result.getPayload()); diff --git a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java index ebf2740999..95c882541d 100644 --- a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java +++ b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java @@ -19,11 +19,10 @@ package org.springframework.integration.ws.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; -import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.ConfigurationException; +import org.springframework.integration.adapter.config.AbstractRemotingOutboundGatewayParser; import org.springframework.integration.ws.handler.MarshallingWebServiceHandler; import org.springframework.integration.ws.handler.SimpleWebServiceHandler; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -31,37 +30,28 @@ import org.springframework.util.StringUtils; * * @author Mark Fisher */ -public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser { +public class WebServiceHandlerParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return (StringUtils.hasText(element.getAttribute("marshaller"))) ? MarshallingWebServiceHandler.class : SimpleWebServiceHandler.class; } @Override - protected boolean shouldGenerateId() { - return false; + protected String getInputChannelAttributeName() { + return "input-channel"; } @Override - protected boolean shouldGenerateIdAsFallback() { - return true; - } - - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected String parseUrl(Element element) { String uri = element.getAttribute("uri"); - if (!StringUtils.hasText(uri)) { - throw new ConfigurationException("The 'uri' attribute is required."); - } - builder.addConstructorArgValue(uri); - String inputChannel = element.getAttribute("input-channel"); - builder.addPropertyReference("inputChannel", inputChannel); - String outputChannel = element.getAttribute("output-channel"); - if (StringUtils.hasText(outputChannel)) { - builder.addPropertyReference("outputChannel", outputChannel); - } + Assert.hasText(uri, "The 'uri' attribute is required."); + return uri; + } + + @Override + protected void postProcessGateway(BeanDefinitionBuilder builder, Element element) { String marshallerRef = element.getAttribute("marshaller"); if (StringUtils.hasText(marshallerRef)) { builder.addConstructorArgReference(marshallerRef); @@ -91,13 +81,10 @@ public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser if (StringUtils.hasText(faultMessageResolverRef)) { builder.addPropertyReference("faultMessageResolver", faultMessageResolverRef); } - String messageSenderRef = element.getAttribute("message-sender"); String messageSenderListRef = element.getAttribute("message-senders"); - if(StringUtils.hasText(messageSenderRef) && StringUtils.hasText(messageSenderListRef)){ - throw new ConfigurationException("Only one of message-sender or mesage-senders should be specified"); - } - + Assert.isTrue(!(StringUtils.hasText(messageSenderRef) && StringUtils.hasText(messageSenderListRef)), + "Only one of message-sender or message-senders should be specified"); if (StringUtils.hasText(messageSenderRef)) { builder.addPropertyReference("messageSender", messageSenderRef); } diff --git a/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java b/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java index 66d30177e6..acd01b426c 100644 --- a/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java +++ b/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java @@ -24,6 +24,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.ws.handler.MarshallingWebServiceHandler; import org.springframework.integration.ws.handler.SimpleWebServiceHandler; import org.springframework.oxm.Marshaller; @@ -44,8 +45,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithDefaultSourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); assertEquals("DefaultSourceExtractor", accessor.getPropertyValue("sourceExtractor").getClass().getSimpleName()); } @@ -54,8 +57,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomSourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); SourceExtractor sourceExtractor = (SourceExtractor) context.getBean("sourceExtractor"); assertEquals(sourceExtractor, accessor.getPropertyValue("sourceExtractor")); } @@ -65,8 +70,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomRequestCallback"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); WebServiceMessageCallback callback = (WebServiceMessageCallback) context.getBean("requestCallback"); assertEquals(callback, accessor.getPropertyValue("requestCallback")); } @@ -76,8 +83,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageFactory"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageFactory factory = (WebServiceMessageFactory) context.getBean("messageFactory"); assertEquals(factory, accessor.getPropertyValue("messageFactory")); @@ -89,8 +98,10 @@ public class WebServiceHandlerParserTests { "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomSourceExtractorAndMessageFactory"); SourceExtractor sourceExtractor = (SourceExtractor) context.getBean("sourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); assertEquals(sourceExtractor, accessor.getPropertyValue("sourceExtractor")); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageFactory factory = (WebServiceMessageFactory) context.getBean("messageFactory"); @@ -102,8 +113,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomFaultMessageResolver"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); FaultMessageResolver resolver = (FaultMessageResolver) context.getBean("faultMessageResolver"); assertEquals(resolver, accessor.getPropertyValue("faultMessageResolver")); @@ -115,8 +128,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageSender"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageSender messageSender = (WebServiceMessageSender) context.getBean("messageSender"); assertEquals(messageSender, ((WebServiceMessageSender[])accessor.getPropertyValue("messageSenders"))[0]); @@ -126,8 +141,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageSenderList"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageSender messageSender = (WebServiceMessageSender) context.getBean("messageSender"); assertEquals(messageSender, ((WebServiceMessageSender[])accessor.getPropertyValue("messageSenders"))[0]); @@ -139,10 +156,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithAllInOneMarshaller"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshallerAndUnmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); assertEquals(marshaller, templateAccessor.getPropertyValue("unmarshaller")); @@ -153,10 +172,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithSeparateMarshallerAndUnmarshaller"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshaller"); Unmarshaller unmarshaller = (Unmarshaller) context.getBean("unmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); @@ -168,8 +189,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomRequestCallback"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); WebServiceMessageCallback callback = (WebServiceMessageCallback) context.getBean("requestCallback"); assertEquals(callback, accessor.getPropertyValue("requestCallback")); } @@ -179,10 +202,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithAllInOneMarshallerAndMessageFactory"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshallerAndUnmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); assertEquals(marshaller, templateAccessor.getPropertyValue("unmarshaller")); @@ -195,10 +220,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithSeparateMarshallerAndUnmarshallerAndMessageFactory"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshaller"); Unmarshaller unmarshaller = (Unmarshaller) context.getBean("unmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java index a69ab622af..1eaf4bdcd6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java @@ -27,26 +27,29 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.BlockingChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; /** - * Base class for {@link MessageBarrier}-based MessageHandlers. - * A {@link MessageEndpoint} implementation that waits for a group of + * Base class for {@link MessageBarrier}-based Message Consumers. + * A {@link MessageConsumer} implementation that waits for a group of * {@link Message Messages} to arrive and processes them together. - * Uses a {@link MessageBarr ier} to store messages and to decide how + * Uses a {@link MessageBarrier} to store messages and to decide how * the messages should be released. *

- * Each {@link Message} that is received by this endpoint will be associated with + * Each {@link Message} that is received by this consumer will be associated with * a group based upon the 'correlationId' property of its * header. If no such property is available, a {@link MessageHandlingException} * will be thrown. @@ -60,7 +63,7 @@ import org.springframework.util.ObjectUtils; * @author Mark Fisher * @author Marius Bogoevici */ -public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHandlingEndpoint implements TaskSchedulerAware { +public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHandlingEndpoint implements TaskSchedulerAware, InitializingBean { public final static long DEFAULT_SEND_TIMEOUT = 1000; @@ -91,8 +94,11 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHand private volatile boolean initialized; + private TaskScheduler taskScheduler; + private ScheduledFuture reaperFutureTask; + /** * Specify a channel for sending Messages that arrive after their aggregation * group has either completed or timed-out. @@ -141,26 +147,32 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHand this.timeout = timeout; } - /** - * Initialize this endpoint. - */ - @Override - protected void initialize() throws Exception { - super.initialize(); + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.taskScheduler = taskScheduler; + } + + public void afterPropertiesSet() { this.trackedCorrelationIds = new ArrayBlockingQueue(this.trackedCorrelationIdCapacity); this.initialized = true; } - - @Override - protected void onStart() { - super.onStart(); - this.reaperFutureTask = this.getTaskScheduler().schedule(new ReaperTask(), new IntervalTrigger(reaperInterval, TimeUnit.MILLISECONDS)); + + public boolean isRunning() { + return this.reaperFutureTask != null; } - @Override - protected void onStop() { - super.onStop(); - this.reaperFutureTask.cancel(true); + public void start() { + if (this.isRunning()) { + return; + } + Assert.state(this.taskScheduler != null, "TaskScheduler must not be null"); + this.reaperFutureTask = this.taskScheduler.schedule( + new ReaperTask(), new IntervalTrigger(this.reaperInterval, TimeUnit.MILLISECONDS)); + } + + public void stop() { + if (this.isRunning()) { + this.reaperFutureTask.cancel(true); + } } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java index 5825091ca4..05acc20710 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java @@ -18,13 +18,11 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.config.BeanDefinitionHolder; -import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; @@ -40,8 +38,6 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio protected static final String METHOD_ATTRIBUTE = "method"; - protected static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; - protected static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; private static final String POLLER_ELEMENT = "poller"; @@ -51,20 +47,9 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio private static final String ERROR_HANDLER_ATTRIBUTE = "error-handler"; - /** - * Subclasses may override this method to determine whether the endpoint - * type should create an adapter. If so, the "ref" attribute will be - * required, and the "method" attribute will typically be used as well. - * - *

The default is true. - */ - protected boolean shouldCreateAdapter(Element element) { - return true; - } - @Override protected final Class getBeanClass(Element element) { - return this.getEndpointClass(); + return ConsumerEndpointFactoryBean.class; } @Override @@ -77,22 +62,42 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio return true; } - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - if (this.shouldCreateAdapter(element)) { - String ref = element.getAttribute(REF_ATTRIBUTE); - Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required."); - if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { - String method = element.getAttribute(METHOD_ATTRIBUTE); - String adapterBeanName = this.parseAdapter(ref, method, element, parserContext); - builder.addConstructorArgReference(adapterBeanName); - } - else { - builder.addConstructorArgReference(ref); - } + /** + * Parse the MessageConsumer. + */ + protected abstract BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext); + + protected String getInputChannelAttributeName() { + return "input-channel"; + } + + protected String parseAdapter(Element element, ParserContext parserContext, Class adapterClass) { + String ref = element.getAttribute(REF_ATTRIBUTE); + Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required."); + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(adapterClass); + String method = element.getAttribute(METHOD_ATTRIBUTE); + builder.addConstructorArgReference(ref); + builder.addConstructorArgValue(method); + return BeanDefinitionReaderUtils.registerWithGeneratedName( + builder.getBeanDefinition(), parserContext.getRegistry()); } - String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE); - Assert.hasText(inputChannel, "the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required"); + return ref; + } + + @Override + protected final void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + BeanDefinitionBuilder consumerBuilder = this.parseConsumer(element, parserContext); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, OUTPUT_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, SELECTOR_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, ERROR_HANDLER_ATTRIBUTE); + String consumerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( + consumerBuilder.getBeanDefinition(), parserContext.getRegistry()); + builder.addConstructorArgReference(consumerBeanName); + String inputChannelAttributeName = this.getInputChannelAttributeName(); + String inputChannelName = element.getAttribute(inputChannelAttributeName); + Assert.hasText(inputChannelName, "the '" + inputChannelAttributeName + "' attribute is required"); + builder.addPropertyValue("inputChannelName", inputChannelName); Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); if (pollerElement != null) { IntegrationNamespaceUtils.configureTrigger(pollerElement, builder); @@ -102,41 +107,13 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio } IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, pollerElement, "task-executor"); } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, SELECTOR_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); this.postProcess(element, parserContext, builder); } - private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { - Class adapterClass = this.getMethodInvokingAdapterClass(); - Assert.state(adapterClass != null, - "Parser needs to create an adapter but 'getMethodInvokingAdapterClass()' returned null."); - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(adapterClass); - builder.addConstructorArgReference(ref); - builder.addConstructorArgValue(method); - String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName( - builder.getBeanDefinition(), parserContext.getRegistry()); - BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName); - parserContext.registerBeanComponent(new BeanComponentDefinition(holder)); - return adapterBeanName; - } - /** * Subclasses may implement this method to provide additional configuration. */ protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { } - /** - * Subclasses must override this if the adapted object is created from - * the "ref" and "method" attribute values. - */ - protected Class getMethodInvokingAdapterClass() { - return null; - } - - protected abstract Class getEndpointClass(); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java index fa37eb762f..2796403a5f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java @@ -22,7 +22,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.xml.DomUtils; @@ -36,20 +36,18 @@ public abstract class AbstractOutboundChannelAdapterParser extends AbstractChann @Override protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) { Element pollerElement = DomUtils.getChildElementByTagName(element, "poller"); - BeanDefinitionBuilder adapterBuilder = null; - adapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(OutboundChannelAdapter.class); - String consumerBeanName = this.parseAndRegisterConsumer(element, parserContext); - adapterBuilder.addConstructorArgReference(consumerBeanName); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerEndpointFactoryBean.class); + builder.addConstructorArgReference(this.parseAndRegisterConsumer(element, parserContext)); if (pollerElement != null) { Assert.hasText(channelName, "outbound channel adapter with a 'poller' requires a 'channel' to poll"); - IntegrationNamespaceUtils.configureTrigger(pollerElement, adapterBuilder); + IntegrationNamespaceUtils.configureTrigger(pollerElement, builder); Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); if (txElement != null) { - IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder); + IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder); } } - adapterBuilder.addPropertyReference("inputChannel", channelName); - return adapterBuilder.getBeanDefinition(); + builder.addPropertyValue("inputChannelName", channelName); + return builder.getBeanDefinition(); } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java index ac9d530fd9..f9ec546bd7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java @@ -22,9 +22,8 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aggregator.AggregatorEndpoint; -import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.integration.aggregator.CompletionStrategyAdapter; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.util.StringUtils; /** @@ -36,40 +35,35 @@ import org.springframework.util.StringUtils; */ public class AggregatorParser extends AbstractEndpointParser { - public static final String COMPLETION_STRATEGY_REF_ATTRIBUTE = "completion-strategy"; + private static final String COMPLETION_STRATEGY_REF_ATTRIBUTE = "completion-strategy"; - public static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method"; + private static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method"; - public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; + private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; - public static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; + private static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; - public static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; + private static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; - public static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; + private static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; - public static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; + private static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; - public static final String TIMEOUT_ATTRIBUTE = "timeout"; + private static final String TIMEOUT_ATTRIBUTE = "timeout"; private static final String COMPLETION_STRATEGY_PROPERTY = "completionStrategy"; - public static final String AGGREGATOR_ELEMENT = "aggregator"; - @Override - protected Class getEndpointClass() { - return AggregatorEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingAggregator.class; - } - - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - super.doParse(element, parserContext, builder); + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(AggregatorEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingAggregator.class)); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); final String completionStrategyRef = element.getAttribute(COMPLETION_STRATEGY_REF_ATTRIBUTE); final String completionStrategyMethod = element.getAttribute(COMPLETION_STRATEGY_METHOD_ATTRIBUTE); if (StringUtils.hasText(completionStrategyRef)) { @@ -82,12 +76,7 @@ public class AggregatorParser extends AbstractEndpointParser { builder.addPropertyReference(COMPLETION_STRATEGY_PROPERTY, completionStrategyRef); } } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); + return builder; } private String createCompletionStrategyAdapter(String ref, String method, ParserContext parserContext) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java index e8659c2a04..7ed0aa5efd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.filter.FilterEndpoint; import org.springframework.integration.filter.MethodInvokingSelector; @@ -28,13 +31,10 @@ import org.springframework.integration.filter.MethodInvokingSelector; public class FilterParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return FilterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingSelector.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FilterEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingSelector.class)); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java index b6f2e529ba..45a9764fa5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java @@ -19,7 +19,7 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aggregator.ResequencerEndpoint; /** @@ -27,33 +27,34 @@ import org.springframework.integration.aggregator.ResequencerEndpoint; * * @author Marius Bogoevici */ -public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { +public class ResequencerParser extends AbstractEndpointParser { - public static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; + private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; - public static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; + private static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; - public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; + private static final String RELEASE_PARTIAL_SEQUENCES = "release-partial-sequences"; + + private static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; + + private static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; + + private static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; + + private static final String TIMEOUT_ATTRIBUTE = "timeout"; @Override - protected Class getBeanClass(Element element) { - return ResequencerEndpoint.class; - } - - @Override - protected boolean isEligibleAttribute(String attributeName) { - return !INPUT_CHANNEL_ATTRIBUTE.equals(attributeName) - &&!OUTPUT_CHANNEL_ATTRIBUTE.equals(attributeName) - && !DISCARD_CHANNEL_ATTRIBUTE.equals(attributeName) - && super.isEligibleAttribute(attributeName); - } - - @Override - protected void postProcess(BeanDefinitionBuilder builder, Element element) { - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ResequencerEndpoint.class); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_PARTIAL_SEQUENCES); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java index 8aacf80681..11ee4ea885 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java @@ -20,7 +20,6 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.router.MethodInvokingChannelResolver; import org.springframework.integration.router.RouterEndpoint; @@ -32,18 +31,13 @@ import org.springframework.integration.router.RouterEndpoint; public class RouterParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return RouterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingChannelResolver.class; - } - - @Override - protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + String adapterBeanName = this.parseAdapter(element, parserContext, MethodInvokingChannelResolver.class); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RouterEndpoint.class); + builder.addConstructorArgReference(adapterBeanName); + builder.addPropertyReference("channelRegistry", MessageBusParser.MESSAGE_BUS_BEAN_NAME); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "default-output-channel"); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java index 9bd8add796..6e87c30d1b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.message.MessageMappingMethodInvoker; @@ -28,13 +31,11 @@ import org.springframework.integration.message.MessageMappingMethodInvoker; public class ServiceActivatorParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return ServiceActivatorEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MessageMappingMethodInvoker.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ServiceActivatorEndpoint.class); + String constructorArg = this.parseAdapter(element, parserContext, MessageMappingMethodInvoker.class); + builder.addConstructorArgReference(constructorArg); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java index f4abd0e47d..11a1adcde6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java @@ -18,7 +18,8 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.splitter.MethodInvokingSplitter; import org.springframework.integration.splitter.SplitterEndpoint; @@ -30,18 +31,13 @@ import org.springframework.integration.splitter.SplitterEndpoint; public class SplitterParser extends AbstractEndpointParser { @Override - protected boolean shouldCreateAdapter(Element element) { - return element.hasAttribute("ref"); - } - - @Override - protected Class getEndpointClass() { - return SplitterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingSplitter.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SplitterEndpoint.class); + if (element.hasAttribute("ref")) { + String adapterBeanName = this.parseAdapter(element, parserContext, MethodInvokingSplitter.class); + builder.addConstructorArgReference(adapterBeanName); + } + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java index bafeafba18..65fe7f5408 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.transformer.MethodInvokingTransformer; import org.springframework.integration.transformer.TransformerEndpoint; @@ -28,13 +31,10 @@ import org.springframework.integration.transformer.TransformerEndpoint; public class TransformerParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return TransformerEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingTransformer.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransformerEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingTransformer.class)); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index a890d865cd..7cc214c57b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -19,15 +19,21 @@ package org.springframework.integration.config.annotation; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import org.springframework.beans.factory.InitializingBean; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.Poller; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -58,55 +64,73 @@ public abstract class AbstractMethodAnnotationPostProcessor pollerFuture; - - private volatile TaskExecutor taskExecutor; - - private volatile int maxMessagesPerPoll = -1; + protected final Log logger = LogFactory.getLog(this.getClass()); private volatile ErrorHandler errorHandler; - private volatile boolean initialized; - - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - - - public void setInputChannel(MessageChannel inputChannel) { - this.inputChannel = inputChannel; - } - - public void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public void setTaskExecutor(TaskExecutor taskExecutor) { - this.taskExecutor = taskExecutor; - } /** * Provide an error handler for any Exceptions that occur - * upon invocation of this endpoint. If none is provided, + * upon invocation of this consumer. If none is provided, * the Exception messages will be logged (at warn level), * and the Exception rethrown. */ @@ -82,94 +48,11 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint this.errorHandler = errorHandler; } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - if (this.poller != null) { - this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); - } - } - - public final boolean isRunning() { - return this.running; - } - - @Override - protected void initialize() throws Exception { - synchronized (this.lifecycleMonitor) { - if (this.inputChannel instanceof PollableChannel && this.poller == null) { - this.poller = new ChannelPoller((PollableChannel) this.inputChannel, this.trigger); - this.poller.setMaxMessagesPerPoll(this.maxMessagesPerPoll); - this.configureTransactionSettingsForPoller(this.poller); - if (this.taskExecutor != null) { - this.poller.setTaskExecutor(this.taskExecutor); - } - this.poller.setConsumer(this); - } - this.initialized = true; - } - } - - public final void start() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - return; - } - if (!this.initialized) { - this.afterPropertiesSet(); - } - Assert.notNull(this.inputChannel, "failed to start endpoint, inputChannel is required"); - if (this.inputChannel instanceof SubscribableChannel) { - ((SubscribableChannel) inputChannel).subscribe(this); - } - else if (this.inputChannel instanceof PollableChannel) { - Assert.notNull(this.getTaskScheduler(), - "failed to start endpoint, no taskScheduler available"); - this.pollerFuture = this.getTaskScheduler().schedule(this.poller, this.poller.getTrigger()); - } - onStart(); - this.running = true; - } - } - - public final void stop() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - return; - } - if (this.inputChannel instanceof SubscribableChannel) { - ((SubscribableChannel) inputChannel).unsubscribe(this); - } - else if (this.pollerFuture != null) { - this.pollerFuture.cancel(true); - } - onStop(); - this.running = false; - } - } - - /** - * Subclasses might override this to supply their own start code (e.g. if they start threads - * on their own). This method will be called within the lifecycleMonitor. - */ - protected void onStart() { - - } - - /** - * Subclasses might override this to supply their own stop code (e.g. if they stop threads - * on their own).This method will be called within the lifecycleMonitor. - * - */ - protected void onStop() { - - } - public final void onMessage(Message message) { - if (message == null || message.getPayload() == null) { - throw new IllegalArgumentException("Message and its payload must not be null"); - } + Assert.notNull(message == null, "Message must not be null"); + Assert.notNull(message.getPayload(), "Message payload must not be null"); if (this.logger.isDebugEnabled()) { - this.logger.debug("endpoint '" + this + "' processing message: " + message); + this.logger.debug("consumer '" + this + "' processing message: " + message); } try { this.onMessageInternal(message); @@ -180,7 +63,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint } else { this.handleException(new MessageHandlingException(message, - "failure occurred in endpoint '" + this.toString() + "'", e)); + "failure occurred in consumer '" + this.toString() + "'", e)); } } } @@ -188,7 +71,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint protected void handleException(MessagingException exception) { if (this.errorHandler == null) { if (this.logger.isWarnEnabled()) { - this.logger.warn("exception occurred in endpoint '" + this + "'", exception); + this.logger.warn("exception occurred in consumer '" + this + "'", exception); } throw exception; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java index d4813025ff..7eea6b5dd9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java @@ -22,6 +22,7 @@ import java.util.List; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -30,6 +31,7 @@ import org.springframework.integration.message.MessageHeaders; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.selector.MessageSelector; +import org.springframework.util.Assert; /** * @author Mark Fisher @@ -44,6 +46,8 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon private volatile boolean requiresReply = false; + private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); + public void setOutputChannel(MessageChannel outputChannel) { this.outputChannel = outputChannel; @@ -74,7 +78,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon Object result = this.handle(message); if (result == null) { if (this.requiresReply) { - throw new MessageHandlingException(message, "endpoint '" + this + throw new MessageHandlingException(message, "consumer '" + this + "' requires a reply, but no reply was received"); } return; @@ -105,7 +109,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon protected boolean supports(Message message) { if (this.selector != null && !this.selector.accept(message)) { if (logger.isDebugEnabled()) { - logger.debug("selector for endpoint '" + this + "' rejected message: " + message); + logger.debug("selector for consumer '" + this + "' rejected message: " + message); } return false; } @@ -117,7 +121,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon } private boolean sendReplyMessage(Message replyMessage, MessageChannel replyChannel) { - return this.getChannelTemplate().send(replyMessage, replyChannel); + return this.channelTemplate.send(replyMessage, replyChannel); } private Message buildReplyMessage(Object result, MessageHeaders requestHeaders) { @@ -153,9 +157,9 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon replyChannel = (MessageChannel) returnAddress; } else if (returnAddress instanceof String) { - if (this.channelRegistry != null) { - replyChannel = this.channelRegistry.lookupChannel((String) returnAddress); - } + Assert.state(this.channelRegistry != null, + "ChannelRegistry is required for resolving a reply channel by name"); + replyChannel = this.channelRegistry.lookupChannel((String) returnAddress); } else { throw new MessagingException("expected a MessageChannel or String for 'returnAddress', but type is [" diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index bb9cb711fd..185af50eff 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -140,6 +140,9 @@ public abstract class AbstractPollingEndpoint implements MessageEndpoint, TaskSc if (!this.initialized) { this.afterPropertiesSet(); } + if (this.isRunning()) { + return; + } Assert.state(this.taskScheduler != null, "unable to start polling, no taskScheduler available"); this.runningTask = this.taskScheduler.schedule(new Poller(), this.trigger); @@ -176,7 +179,7 @@ public abstract class AbstractPollingEndpoint implements MessageEndpoint, TaskSc private void poll() { int count = 0; - while (maxMessagesPerPoll < 0 || count < maxMessagesPerPoll) { + while (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll) { if (!innerPoll()) { break; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java index 7e7ab5fea0..29f9268232 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java @@ -31,7 +31,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint { +public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint implements InitializingBean { private final MethodResolver methodResolver = new DefaultMethodResolver(ServiceActivator.class); @@ -56,9 +56,7 @@ public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint { } - @Override - protected void initialize() throws Exception { - super.initialize(); + public void afterPropertiesSet() throws Exception { if (this.invoker instanceof InitializingBean) { ((InitializingBean) this.invoker).afterPropertiesSet(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java index 6c25d1b9a6..52f0ece4a1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java @@ -22,6 +22,8 @@ import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; @@ -39,7 +41,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, InitializingBean { +public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistryAware, BeanFactoryAware, InitializingBean { private final MessageConsumer consumer; @@ -76,6 +78,16 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar this.inputChannelName = inputChannelName; } + public void setChannelRegistry(ChannelRegistry channelRegistry) { + if (this.consumer instanceof ChannelRegistryAware) { + ((ChannelRegistryAware) this.consumer).setChannelRegistry(channelRegistry); + } + } + + public void setTrigger(Trigger trigger) { + this.trigger = trigger; + } + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { this.maxMessagesPerPoll = maxMessagesPerPoll; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java index 966f0b81ce..73f7a6522b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java @@ -21,8 +21,14 @@ import org.springframework.integration.bus.MessageBusAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.channel.SubscribableChannel; +import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.MessagingGateway; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.util.Assert; @@ -42,7 +48,7 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); - private volatile ReplyMessageCorrelator replyMessageCorrelator; + private volatile MessageEndpoint replyMessageCorrelator; private volatile MessageBus messageBus; @@ -145,10 +151,23 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess return; } Assert.state(this.messageBus != null, "No MessageBus available. Cannot register ReplyMessageCorrelator."); - ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(); - correlator.setBeanName("internal.correlator." + this); - correlator.setInputChannel(this.replyChannel); - correlator.afterPropertiesSet(); + MessageEndpoint correlator = null; + MessageConsumer consumer = new AbstractMessageHandlingEndpoint() { + @Override + protected Object handle(Message message) { + return message; + } + }; + if (this.replyChannel instanceof SubscribableChannel) { + correlator = new SubscribingConsumerEndpoint( + consumer, (SubscribableChannel) this.replyChannel); + } + else if (this.replyChannel instanceof PollableChannel) { + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint( + consumer, (PollableChannel) this.replyChannel); + endpoint.afterPropertiesSet(); + correlator = endpoint; + } this.messageBus.registerEndpoint(correlator); this.replyMessageCorrelator = correlator; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java index 56521ac4cb..b20e4c378c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java @@ -21,6 +21,7 @@ import java.util.Collection; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; @@ -37,6 +38,8 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements private volatile boolean resolutionRequired; + private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); + public RouterEndpoint(ChannelResolver channelResolver) { Assert.notNull(channelResolver, "ChannelResolver must not be null"); @@ -59,7 +62,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements * default, there is no timeout, meaning the send will block indefinitely. */ public void setTimeout(long timeout) { - this.getChannelTemplate().setSendTimeout(timeout); + this.channelTemplate.setSendTimeout(timeout); } /** @@ -79,7 +82,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements if (results != null) { for (MessageChannel channel : results) { if (channel != null) { - if (this.getChannelTemplate().send(message, channel)) { + if (this.channelTemplate.send(message, channel)) { sent = true; } } @@ -87,7 +90,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements } if (!sent) { if (this.defaultOutputChannel != null) { - sent = this.getChannelTemplate().send(message, this.defaultOutputChannel); + sent = this.channelTemplate.send(message, this.defaultOutputChannel); } else if (this.resolutionRequired) { throw new MessageDeliveryException(message, diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java index 559258dcb9..085cf60afd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java @@ -22,7 +22,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.config.AbstractEndpointParser; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.transformer.Transformer; import org.springframework.integration.transformer.TransformerEndpoint; @@ -32,23 +31,15 @@ import org.springframework.integration.transformer.TransformerEndpoint; public abstract class AbstractTransformerParser extends AbstractEndpointParser { @Override - protected boolean shouldCreateAdapter(Element element) { - return false; - } - - @Override - protected Class getEndpointClass() { - return TransformerEndpoint.class; - } - - @Override - protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransformerEndpoint.class); BeanDefinitionBuilder transformerBuilder = BeanDefinitionBuilder.genericBeanDefinition(this.getTransformerClass()); this.parseTransformer(element, parserContext, transformerBuilder); String transformerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( transformerBuilder.getBeanDefinition(), parserContext.getRegistry()); builder.addConstructorArgReference(transformerBeanName); + return builder; } protected abstract Class getTransformerClass(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java index 9396d85beb..9db00b2cb4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java @@ -52,7 +52,7 @@ public class AggregatorEndpointTests { this.aggregator = new AggregatorEndpoint(new TestAggregator()); this.aggregator.setTaskScheduler(this.taskScheduler); this.taskScheduler.start(); - this.aggregator.onStart(); + this.aggregator.start(); } @Test @@ -325,7 +325,7 @@ public class AggregatorEndpointTests { @After public void stopTaskScheduler() { this.taskScheduler.stop(); - this.aggregator.onStop(); + this.aggregator.stop(); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java index 43f186a1ca..624d1e07ac 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java @@ -46,7 +46,7 @@ public class ResequencerEndpointTests { this.taskScheduler = Schedulers.createDefaultTaskScheduler(10); this.resequencer.setTaskScheduler(taskScheduler); taskScheduler.start(); - this.resequencer.onStart(); + this.resequencer.start(); } @Test @@ -149,7 +149,7 @@ public class ResequencerEndpointTests { @After public void stopTaskScheduler() { - this.resequencer.onStop(); + this.resequencer.stop(); this.taskScheduler.stop(); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 3cf7dc42bd..0e065674b8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -36,7 +36,9 @@ import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.ErrorMessage; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -62,17 +64,18 @@ public class DefaultMessageBusTests { Message message = MessageBuilder.withPayload("test") .setReturnAddress("targetChannel").build(); sourceChannel.send(message); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return message; } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(sourceChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel); + endpoint.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); context.refresh(); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); + consumer.setChannelRegistry(bus); bus.start(); Message result = targetChannel.receive(3000); assertEquals("test", result.getPayload()); @@ -116,12 +119,12 @@ public class DefaultMessageBusTests { QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); - AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return MessageBuilder.fromMessage(message).build(); } }; - AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return MessageBuilder.fromMessage(message).build(); } @@ -132,12 +135,12 @@ public class DefaultMessageBusTests { context.getBeanFactory().registerSingleton("input", inputChannel); context.getBeanFactory().registerSingleton("output1", outputChannel1); context.getBeanFactory().registerSingleton("output2", outputChannel2); - endpoint1.setBeanName("testEndpoint1"); - endpoint1.setInputChannel(inputChannel); - endpoint1.setOutputChannel(outputChannel1); - endpoint2.setBeanName("testEndpoint2"); - endpoint2.setInputChannel(inputChannel); - endpoint2.setOutputChannel(outputChannel2); + consumer1.setOutputChannel(outputChannel1); + consumer2.setOutputChannel(outputChannel2); + PollingConsumerEndpoint endpoint1 = new PollingConsumerEndpoint(consumer1, inputChannel); + endpoint1.afterPropertiesSet(); + PollingConsumerEndpoint endpoint2 = new PollingConsumerEndpoint(consumer2, inputChannel); + endpoint2.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); DefaultMessageBus bus = new DefaultMessageBus(); @@ -157,14 +160,14 @@ public class DefaultMessageBusTests { QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); final CountDownLatch latch = new CountDownLatch(2); - AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { Message reply = MessageBuilder.fromMessage(message).build(); latch.countDown(); return reply; } }; - AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { Message reply = MessageBuilder.fromMessage(message).build(); latch.countDown(); @@ -177,12 +180,10 @@ public class DefaultMessageBusTests { context.getBeanFactory().registerSingleton("input", inputChannel); context.getBeanFactory().registerSingleton("output1", outputChannel1); context.getBeanFactory().registerSingleton("output2", outputChannel2); - endpoint1.setBeanName("testEndpoint1"); - endpoint1.setInputChannel(inputChannel); - endpoint1.setOutputChannel(outputChannel1); - endpoint2.setBeanName("testEndpoint2"); - endpoint2.setInputChannel(inputChannel); - endpoint2.setOutputChannel(outputChannel2); + consumer1.setOutputChannel(outputChannel1); + consumer2.setOutputChannel(outputChannel2); + SubscribingConsumerEndpoint endpoint1 = new SubscribingConsumerEndpoint(consumer1, inputChannel); + SubscribingConsumerEndpoint endpoint2 = new SubscribingConsumerEndpoint(consumer2, inputChannel); context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); DefaultMessageBus bus = new DefaultMessageBus(); @@ -246,14 +247,14 @@ public class DefaultMessageBusTests { errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel); final CountDownLatch latch = new CountDownLatch(1); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { latch.countDown(); return null; } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(errorChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, errorChannel); + endpoint.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 5d0c1054b5..f793e53567 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -31,11 +31,10 @@ import org.springframework.integration.channel.ThreadLocalChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageMappingMethodInvoker; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.util.MethodInvoker; /** * @author Mark Fisher @@ -61,11 +60,9 @@ public class DirectChannelSubscriptionTests { @Test public void testSendAndReceiveForRegisteredEndpoint() { GenericApplicationContext context = new GenericApplicationContext(); - MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle"); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker); - endpoint.setInputChannel(sourceChannel); - endpoint.setOutputChannel(targetChannel); - endpoint.setBeanName("testEndpoint"); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "handle"); + serviceActivator.setOutputChannel(targetChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); bus.setApplicationContext(context); bus.start(); @@ -96,14 +93,13 @@ public class DirectChannelSubscriptionTests { QueueChannel errorChannel = new QueueChannel(); errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); bus.registerChannel(errorChannel); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { throw new RuntimeException("intentional test failure"); } }; - endpoint.setInputChannel(sourceChannel); - endpoint.setOutputChannel(targetChannel); - endpoint.setBeanName("testEndpoint"); + consumer.setOutputChannel(targetChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel); bus.registerEndpoint(endpoint); bus.start(); this.sourceChannel.send(new StringMessage("foo")); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml index c7468ea49d..a2aee37bee 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml @@ -10,9 +10,13 @@ - + + + + + + - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java index 33362f17fc..1987d78303 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; @@ -46,13 +47,13 @@ public class MessageChannelTemplateTests { public void setUp() { this.requestChannel = new QueueChannel(); this.requestChannel.setBeanName("requestChannel"); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return new StringMessage(message.getPayload().toString().toUpperCase()); } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(requestChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, requestChannel); + endpoint.afterPropertiesSet(); GenericApplicationContext context = new GenericApplicationContext(); context.getBeanFactory().registerSingleton("requestChannel", requestChannel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 861795a16a..56253a0957 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -28,11 +28,11 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.AggregatorEndpoint; import org.springframework.integration.aggregator.CompletionStrategy; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.util.MethodInvoker; @@ -53,15 +53,14 @@ public class AggregatorParserTests { @Test public void testAggregation() { - AggregatorEndpoint endpoint = - (AggregatorEndpoint) context.getBean("aggregatorWithReference"); + MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput"); TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 1, null)); outboundMessages.add(createMessage("789", "id1", 3, 3, null)); outboundMessages.add(createMessage("456", "id1", 3, 2, null)); for (Message message : outboundMessages) { - endpoint.onMessage(message); + input.send(message); } Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean .getAggregatedMessages().size()); @@ -72,13 +71,14 @@ public class AggregatorParserTests { @Test public void testPropertyAssignment() throws Exception { - AggregatorEndpoint endpoint = - (AggregatorEndpoint) context.getBean("completelyDefinedAggregator"); + SubscribingConsumerEndpoint endpoint = + (SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator"); TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean"); CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + DirectFieldAccessor accessor = new DirectFieldAccessor( + new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance", testAggregator, accessor.getPropertyValue("aggregator")); Assert.assertEquals( @@ -105,13 +105,13 @@ public class AggregatorParserTests { @Test public void testSimpleJavaBeanAggregator() { List> outboundMessages = new ArrayList>(); - AggregatorEndpoint addingAggregator = - (AggregatorEndpoint) context.getBean("aggregatorWithReferenceAndMethod"); + MessageChannel input = + (MessageChannel) context.getBean("aggregatorWithReferenceAndMethodInput"); outboundMessages.add(createMessage(1l, "id1", 3, 1, null)); outboundMessages.add(createMessage(2l, "id1", 3, 3, null)); outboundMessages.add(createMessage(3l, "id1", 3, 2, null)); for (Message message : outboundMessages) { - addingAggregator.onMessage(message); + input.send(message); } PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message response = outputChannel.receive(); @@ -130,23 +130,24 @@ public class AggregatorParserTests { } @Test - public void testAggregatorWithPojoCompletionStrategy(){ - AggregatorEndpoint aggregatorWithPojoCompletionStrategy = - (AggregatorEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy"); - CompletionStrategy completionStrategy = (CompletionStrategy) - new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy"); + public void testAggregatorWithPojoCompletionStrategy() { + MessageChannel input = (MessageChannel) context.getBean("aggregatorWithPojoCompletionStrategyInput"); + SubscribingConsumerEndpoint endpoint = + (SubscribingConsumerEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy"); + CompletionStrategy completionStrategy = (CompletionStrategy) new DirectFieldAccessor( + new DirectFieldAccessor(endpoint).getPropertyValue("consumer")).getPropertyValue("completionStrategy"); Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter); DirectFieldAccessor completionStrategyAccessor = new DirectFieldAccessor(completionStrategy); MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker"); Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy); Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness")); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(1l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(2l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(3l, "id1", 0 , 0, null)); + input.send(createMessage(1l, "id1", 0 , 0, null)); + input.send(createMessage(2l, "id1", 0 , 0, null)); + input.send(createMessage(3l, "id1", 0 , 0, null)); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message reply = outputChannel.receive(0); Assert.assertNull(reply); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(5l, "id1", 0 , 0, null)); + input.send(createMessage(5l, "id1", 0 , 0, null)); reply = outputChannel.receive(0); Assert.assertNotNull(reply); Assert.assertEquals(11l, reply.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java index 6e8b3ad814..ff574e0c52 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java @@ -28,7 +28,7 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; import org.springframework.test.context.ContextConfiguration; @@ -50,7 +50,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertNotNull(bus.lookupChannel(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); - assertTrue(adapter instanceof OutboundChannelAdapter); + assertTrue(adapter instanceof SubscribingConsumerEndpoint); TestConsumer consumer = (TestConsumer) this.applicationContext.getBean("consumer"); assertNull(consumer.getLastMessage()); Message message = new StringMessage("test"); @@ -70,7 +70,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertNotNull(bus.lookupChannel(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); - assertTrue(adapter instanceof OutboundChannelAdapter); + assertTrue(adapter instanceof SubscribingConsumerEndpoint); TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); assertNull(testBean.getMessage()); Message message = new StringMessage("consumer test"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index b32be1fd3b..8d9197a744 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -30,7 +30,6 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; @@ -56,11 +55,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorAccepts() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); + MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); QueueChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.withPayload("test") .setReturnAddress(replyChannel).build(); - endpoint.onMessage(message); + inputChannel.send(message); Message reply = replyChannel.receive(500); assertNotNull(reply); assertEquals("foo", reply.getPayload()); @@ -70,11 +69,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorRejects() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); + MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); MessageChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.withPayload(123) .setReturnAddress(replyChannel).build(); - endpoint.onMessage(message); + inputChannel.send(message); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 9a7bbb3a64..6e1fb26847 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -26,9 +26,9 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.ResequencerEndpoint; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -70,8 +70,8 @@ public class ResequencerParserTests { @Test public void testDefaultResequencerProperties() { - ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("defaultResequencer"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("defaultResequencer"); + DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertNull(accessor.getPropertyValue("outputChannel")); Assert.assertNull(accessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value", @@ -92,10 +92,10 @@ public class ResequencerParserTests { @Test public void testPropertyAssignment() throws Exception { - ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("completelyDefinedResequencer"); + SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedResequencer"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel", outputChannel, accessor.getPropertyValue("outputChannel")); Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel", diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 815eaf2417..973e9bc8d2 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -9,7 +9,6 @@ - @@ -17,10 +16,12 @@ - + + + + + - - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java index abb2112aee..35329a5296 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java @@ -43,9 +43,9 @@ public class CorrelationIdTests { .setCorrelationId(correlationId).build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -57,9 +57,9 @@ public class CorrelationIdTests { Message message = MessageBuilder.withPayload("test").build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -72,9 +72,9 @@ public class CorrelationIdTests { .setCorrelationId("correlationId").build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -89,9 +89,9 @@ public class CorrelationIdTests { .setCorrelationId(correlationId).build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -103,9 +103,9 @@ public class CorrelationIdTests { Message message = new StringMessage("test"); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java index ac1ea5f2e1..b0d2a895ec 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java @@ -33,11 +33,10 @@ public class ServiceActivatorMethodResolutionTests { @Test public void singleAnnotationMatches() { SingleAnnotationTestBean testBean = new SingleAnnotationTestBean(); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean); QueueChannel outputChannel = new QueueChannel(); - endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); - endpoint.onMessage(new StringMessage("foo")); + serviceActivator.setOutputChannel(outputChannel); + serviceActivator.onMessage(new StringMessage("foo")); Message result = outputChannel.receive(0); assertEquals("FOO", result.getPayload()); } @@ -51,11 +50,10 @@ public class ServiceActivatorMethodResolutionTests { @Test public void singlePublicMethodMatches() { SinglePublicMethodTestBean testBean = new SinglePublicMethodTestBean(); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean); QueueChannel outputChannel = new QueueChannel(); - endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); - endpoint.onMessage(new StringMessage("foo")); + serviceActivator.setOutputChannel(outputChannel); + serviceActivator.onMessage(new StringMessage("foo")); Message result = outputChannel.receive(0); assertEquals("FOO", result.getPayload()); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java index bfe322cf3f..35698d5647 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.filter.FilterEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -65,9 +66,9 @@ public class FilterEndpointTests { return true; } }); - filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.start(); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel); + endpoint.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -84,9 +85,9 @@ public class FilterEndpointTests { return false; } }); - filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.start(); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel); + endpoint.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); assertNull(outputChannel.receive(0)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java index 0e3c2ce9b2..e156fc4169 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; /** @@ -82,9 +83,8 @@ public class MethodInvokingConsumerTests { Message message = new GenericMessage("testing"); channel.send(message); assertNull(queue.poll()); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(consumer); - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(channel); + ServiceActivatorEndpoint serivceActivator = new ServiceActivatorEndpoint(consumer); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(serivceActivator, channel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml index 4807ae144e..eb4c9c9193 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml @@ -14,12 +14,15 @@ - - + +