diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java index bf823a2725..b4db69d314 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java @@ -39,10 +39,6 @@ public abstract class AbstractRemotingOutboundGateway extends AbstractMessageHan } - public void setRequestChannel(MessageChannel requestChannel) { - this.setInputChannel(requestChannel); - } - public void setReplyChannel(MessageChannel replyChannel) { this.setOutputChannel(replyChannel); } diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java index d1d6bc6537..73945fe5f0 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingGatewayParser.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -33,7 +34,9 @@ import org.springframework.util.StringUtils; */ public abstract class AbstractRemotingGatewayParser extends AbstractSimpleBeanDefinitionParser { - protected abstract Class getBeanClass(Element element); + protected Class getBeanClass(Element element) { + return ConsumerEndpointFactoryBean.class; + } @Override diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java index ddf3f52815..b07dd5d7c7 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/config/AbstractRemotingOutboundGatewayParser.java @@ -18,26 +18,64 @@ package org.springframework.integration.adapter.config; import org.w3c.dom.Element; +import org.springframework.beans.factory.BeanDefinitionStoreException; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.AbstractEndpointParser; import org.springframework.util.Assert; +import org.springframework.util.StringUtils; /** * Base class for url-based remoting outbound gateway parsers. * * @author Mark Fisher */ -public abstract class AbstractRemotingOutboundGatewayParser extends AbstractRemotingGatewayParser { +public abstract class AbstractRemotingOutboundGatewayParser extends AbstractEndpointParser { + + protected abstract Class getGatewayClass(Element element); @Override - protected boolean isEligibleAttribute(String attributeName) { - return !attributeName.equals("url") && super.isEligibleAttribute(attributeName); + protected String getInputChannelAttributeName() { + return "request-channel"; } @Override - protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { + protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) + throws BeanDefinitionStoreException { + String id = super.resolveId(element, definition, parserContext); + if (!StringUtils.hasText(id)) { + id = element.getAttribute("name"); + } + if (!StringUtils.hasText(id)) { + id = parserContext.getReaderContext().generateBeanName(definition); + } + return id; + } + + @Override + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(this.getGatewayClass(element)); + String url = this.parseUrl(element); + builder.addConstructorArgValue(url); + String replyChannel = element.getAttribute("reply-channel"); + if (StringUtils.hasText(replyChannel)) { + builder.addPropertyReference("replyChannel", replyChannel); + } + this.postProcessGateway(builder, element); + return builder; + } + + protected String parseUrl(Element element) { String url = element.getAttribute("url"); Assert.hasText(url, "The 'url' attribute is required."); - builder.addConstructorArgValue(url); + return url; + } + + /** + * Subclasses may override this method for additional configuration. + */ + protected void postProcessGateway(BeanDefinitionBuilder builder, Element element) { } } diff --git a/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java b/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java index 2fb35dd2b9..208481e1c4 100644 --- a/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java +++ b/org.springframework.integration.file/src/test/java/org/springframework/integration/file/config/FileOutboundChannelAdapterParserTests.java @@ -27,7 +27,7 @@ import org.junit.runner.RunWith; import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.file.DefaultFileNameGenerator; import org.springframework.integration.file.FileWritingMessageConsumer; import org.springframework.test.context.ContextConfiguration; @@ -43,11 +43,11 @@ public class FileOutboundChannelAdapterParserTests { @Autowired @Qualifier("simpleAdapter") - OutboundChannelAdapter simpleAdapter; + SubscribingConsumerEndpoint simpleAdapter; @Autowired @Qualifier("adapterWithCustomNameGenerator") - OutboundChannelAdapter adapterWithCustomNameGenerator; + SubscribingConsumerEndpoint adapterWithCustomNameGenerator; @Test diff --git a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java index 17bcd4e806..9941798051 100644 --- a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java +++ b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParser.java @@ -29,7 +29,7 @@ import org.springframework.integration.httpinvoker.HttpInvokerOutboundGateway; public class HttpInvokerOutboundGatewayParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return HttpInvokerOutboundGateway.class; } diff --git a/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java b/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java index 89191765bb..838bde8d01 100644 --- a/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java +++ b/org.springframework.integration.httpinvoker/src/test/java/org/springframework/integration/httpinvoker/config/HttpInvokerOutboundGatewayParserTests.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.httpinvoker.HttpInvokerOutboundGateway; /** @@ -33,7 +35,9 @@ public class HttpInvokerOutboundGatewayParserTests { public void testHttpInvokerOutboundGatewayParser() { ApplicationContext context = new ClassPathXmlApplicationContext( "httpInvokerOutboundGatewayParserTests.xml", this.getClass()); - Object gateway = context.getBean("gateway"); + Object endpoint = context.getBean("gateway"); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); assertEquals(HttpInvokerOutboundGateway.class, gateway.getClass()); } diff --git a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java index d0ffb58ae5..afe8a84215 100644 --- a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java +++ b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParser.java @@ -20,7 +20,6 @@ import java.rmi.registry.Registry; import org.w3c.dom.Element; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.integration.adapter.config.AbstractRemotingOutboundGatewayParser; import org.springframework.integration.rmi.RmiInboundGateway; import org.springframework.integration.rmi.RmiOutboundGateway; @@ -35,28 +34,19 @@ import org.springframework.util.StringUtils; public class RmiOutboundGatewayParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return RmiOutboundGateway.class; } @Override - protected boolean isEligibleAttribute(String attributeName) { - return !"host".equals(attributeName) - && !"port".equals(attributeName) - && !"remote-channel".equals(attributeName) - && super.isEligibleAttribute(attributeName); - } - - @Override - protected void doPostProcess(BeanDefinitionBuilder builder, Element element) { + protected String parseUrl(Element element) { String host = element.getAttribute("host"); String remoteChannel = element.getAttribute("remote-channel"); Assert.isTrue(StringUtils.hasText(host) && StringUtils.hasText(remoteChannel), "The 'host' and 'remote-channel' attributes are both required"); String portAttribute = element.getAttribute("port"); String port = StringUtils.hasText(portAttribute) ? portAttribute : "" + Registry.REGISTRY_PORT; - String url = "rmi://" + host + ":" + port + "/" + RmiInboundGateway.SERVICE_NAME_PREFIX + remoteChannel; - builder.addConstructorArgValue(url); + return "rmi://" + host + ":" + port + "/" + RmiInboundGateway.SERVICE_NAME_PREFIX + remoteChannel; } } diff --git a/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java b/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java index 9a674e4272..af59956052 100644 --- a/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java +++ b/org.springframework.integration.rmi/src/test/java/org/springframework/integration/rmi/config/RmiOutboundGatewayParserTests.java @@ -29,7 +29,6 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; import org.springframework.integration.rmi.RmiInboundGateway; -import org.springframework.integration.rmi.RmiOutboundGateway; /** * @author Mark Fisher @@ -53,8 +52,8 @@ public class RmiOutboundGatewayParserTests { public void directInvocation() { ApplicationContext context = new ClassPathXmlApplicationContext( "rmiOutboundGatewayParserTests.xml", this.getClass()); - RmiOutboundGateway gateway = (RmiOutboundGateway) context.getBean("gateway"); - gateway.handle(new StringMessage("test")); + MessageChannel localChannel = (MessageChannel) context.getBean("localChannel"); + localChannel.send(new StringMessage("test")); Message result = testChannel.receive(1000); assertNotNull(result); assertEquals("test", result.getPayload()); diff --git a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java index ebf2740999..95c882541d 100644 --- a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java +++ b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java @@ -19,11 +19,10 @@ package org.springframework.integration.ws.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; -import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.ConfigurationException; +import org.springframework.integration.adapter.config.AbstractRemotingOutboundGatewayParser; import org.springframework.integration.ws.handler.MarshallingWebServiceHandler; import org.springframework.integration.ws.handler.SimpleWebServiceHandler; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -31,37 +30,28 @@ import org.springframework.util.StringUtils; * * @author Mark Fisher */ -public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser { +public class WebServiceHandlerParser extends AbstractRemotingOutboundGatewayParser { @Override - protected Class getBeanClass(Element element) { + protected Class getGatewayClass(Element element) { return (StringUtils.hasText(element.getAttribute("marshaller"))) ? MarshallingWebServiceHandler.class : SimpleWebServiceHandler.class; } @Override - protected boolean shouldGenerateId() { - return false; + protected String getInputChannelAttributeName() { + return "input-channel"; } @Override - protected boolean shouldGenerateIdAsFallback() { - return true; - } - - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected String parseUrl(Element element) { String uri = element.getAttribute("uri"); - if (!StringUtils.hasText(uri)) { - throw new ConfigurationException("The 'uri' attribute is required."); - } - builder.addConstructorArgValue(uri); - String inputChannel = element.getAttribute("input-channel"); - builder.addPropertyReference("inputChannel", inputChannel); - String outputChannel = element.getAttribute("output-channel"); - if (StringUtils.hasText(outputChannel)) { - builder.addPropertyReference("outputChannel", outputChannel); - } + Assert.hasText(uri, "The 'uri' attribute is required."); + return uri; + } + + @Override + protected void postProcessGateway(BeanDefinitionBuilder builder, Element element) { String marshallerRef = element.getAttribute("marshaller"); if (StringUtils.hasText(marshallerRef)) { builder.addConstructorArgReference(marshallerRef); @@ -91,13 +81,10 @@ public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser if (StringUtils.hasText(faultMessageResolverRef)) { builder.addPropertyReference("faultMessageResolver", faultMessageResolverRef); } - String messageSenderRef = element.getAttribute("message-sender"); String messageSenderListRef = element.getAttribute("message-senders"); - if(StringUtils.hasText(messageSenderRef) && StringUtils.hasText(messageSenderListRef)){ - throw new ConfigurationException("Only one of message-sender or mesage-senders should be specified"); - } - + Assert.isTrue(!(StringUtils.hasText(messageSenderRef) && StringUtils.hasText(messageSenderListRef)), + "Only one of message-sender or message-senders should be specified"); if (StringUtils.hasText(messageSenderRef)) { builder.addPropertyReference("messageSender", messageSenderRef); } diff --git a/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java b/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java index 66d30177e6..acd01b426c 100644 --- a/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java +++ b/org.springframework.integration.ws/src/test/java/org/springframework/integration/ws/config/WebServiceHandlerParserTests.java @@ -24,6 +24,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.ws.handler.MarshallingWebServiceHandler; import org.springframework.integration.ws.handler.SimpleWebServiceHandler; import org.springframework.oxm.Marshaller; @@ -44,8 +45,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithDefaultSourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); assertEquals("DefaultSourceExtractor", accessor.getPropertyValue("sourceExtractor").getClass().getSimpleName()); } @@ -54,8 +57,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomSourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); SourceExtractor sourceExtractor = (SourceExtractor) context.getBean("sourceExtractor"); assertEquals(sourceExtractor, accessor.getPropertyValue("sourceExtractor")); } @@ -65,8 +70,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomRequestCallback"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); WebServiceMessageCallback callback = (WebServiceMessageCallback) context.getBean("requestCallback"); assertEquals(callback, accessor.getPropertyValue("requestCallback")); } @@ -76,8 +83,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageFactory"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageFactory factory = (WebServiceMessageFactory) context.getBean("messageFactory"); assertEquals(factory, accessor.getPropertyValue("messageFactory")); @@ -89,8 +98,10 @@ public class WebServiceHandlerParserTests { "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomSourceExtractorAndMessageFactory"); SourceExtractor sourceExtractor = (SourceExtractor) context.getBean("sourceExtractor"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); assertEquals(sourceExtractor, accessor.getPropertyValue("sourceExtractor")); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageFactory factory = (WebServiceMessageFactory) context.getBean("messageFactory"); @@ -102,8 +113,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomFaultMessageResolver"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); FaultMessageResolver resolver = (FaultMessageResolver) context.getBean("faultMessageResolver"); assertEquals(resolver, accessor.getPropertyValue("faultMessageResolver")); @@ -115,8 +128,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageSender"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageSender messageSender = (WebServiceMessageSender) context.getBean("messageSender"); assertEquals(messageSender, ((WebServiceMessageSender[])accessor.getPropertyValue("messageSenders"))[0]); @@ -126,8 +141,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "simpleWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomMessageSenderList"); - assertEquals(SimpleWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(SimpleWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); accessor = new DirectFieldAccessor(accessor.getPropertyValue("webServiceTemplate")); WebServiceMessageSender messageSender = (WebServiceMessageSender) context.getBean("messageSender"); assertEquals(messageSender, ((WebServiceMessageSender[])accessor.getPropertyValue("messageSenders"))[0]); @@ -139,10 +156,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithAllInOneMarshaller"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshallerAndUnmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); assertEquals(marshaller, templateAccessor.getPropertyValue("unmarshaller")); @@ -153,10 +172,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithSeparateMarshallerAndUnmarshaller"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshaller"); Unmarshaller unmarshaller = (Unmarshaller) context.getBean("unmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); @@ -168,8 +189,10 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithCustomRequestCallback"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(gateway); WebServiceMessageCallback callback = (WebServiceMessageCallback) context.getBean("requestCallback"); assertEquals(callback, accessor.getPropertyValue("requestCallback")); } @@ -179,10 +202,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithAllInOneMarshallerAndMessageFactory"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshallerAndUnmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); assertEquals(marshaller, templateAccessor.getPropertyValue("unmarshaller")); @@ -195,10 +220,12 @@ public class WebServiceHandlerParserTests { ApplicationContext context = new ClassPathXmlApplicationContext( "marshallingWebServiceHandlerParserTests.xml", this.getClass()); MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerWithSeparateMarshallerAndUnmarshallerAndMessageFactory"); - assertEquals(MarshallingWebServiceHandler.class, endpoint.getClass()); - DirectFieldAccessor handlerAccessor = new DirectFieldAccessor(endpoint); + assertEquals(SubscribingConsumerEndpoint.class, endpoint.getClass()); + Object gateway = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + assertEquals(MarshallingWebServiceHandler.class, gateway.getClass()); + DirectFieldAccessor gatewayAccessor = new DirectFieldAccessor(gateway); DirectFieldAccessor templateAccessor = new DirectFieldAccessor( - handlerAccessor.getPropertyValue("webServiceTemplate")); + gatewayAccessor.getPropertyValue("webServiceTemplate")); Marshaller marshaller = (Marshaller) context.getBean("marshaller"); Unmarshaller unmarshaller = (Unmarshaller) context.getBean("unmarshaller"); assertEquals(marshaller, templateAccessor.getPropertyValue("marshaller")); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java index a69ab622af..1eaf4bdcd6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java @@ -27,26 +27,29 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.BlockingChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; /** - * Base class for {@link MessageBarrier}-based MessageHandlers. - * A {@link MessageEndpoint} implementation that waits for a group of + * Base class for {@link MessageBarrier}-based Message Consumers. + * A {@link MessageConsumer} implementation that waits for a group of * {@link Message Messages} to arrive and processes them together. - * Uses a {@link MessageBarr ier} to store messages and to decide how + * Uses a {@link MessageBarrier} to store messages and to decide how * the messages should be released. *

- * Each {@link Message} that is received by this endpoint will be associated with + * Each {@link Message} that is received by this consumer will be associated with * a group based upon the 'correlationId' property of its * header. If no such property is available, a {@link MessageHandlingException} * will be thrown. @@ -60,7 +63,7 @@ import org.springframework.util.ObjectUtils; * @author Mark Fisher * @author Marius Bogoevici */ -public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHandlingEndpoint implements TaskSchedulerAware { +public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHandlingEndpoint implements TaskSchedulerAware, InitializingBean { public final static long DEFAULT_SEND_TIMEOUT = 1000; @@ -91,8 +94,11 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHand private volatile boolean initialized; + private TaskScheduler taskScheduler; + private ScheduledFuture reaperFutureTask; + /** * Specify a channel for sending Messages that arrive after their aggregation * group has either completed or timed-out. @@ -141,26 +147,32 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractMessageHand this.timeout = timeout; } - /** - * Initialize this endpoint. - */ - @Override - protected void initialize() throws Exception { - super.initialize(); + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.taskScheduler = taskScheduler; + } + + public void afterPropertiesSet() { this.trackedCorrelationIds = new ArrayBlockingQueue(this.trackedCorrelationIdCapacity); this.initialized = true; } - - @Override - protected void onStart() { - super.onStart(); - this.reaperFutureTask = this.getTaskScheduler().schedule(new ReaperTask(), new IntervalTrigger(reaperInterval, TimeUnit.MILLISECONDS)); + + public boolean isRunning() { + return this.reaperFutureTask != null; } - @Override - protected void onStop() { - super.onStop(); - this.reaperFutureTask.cancel(true); + public void start() { + if (this.isRunning()) { + return; + } + Assert.state(this.taskScheduler != null, "TaskScheduler must not be null"); + this.reaperFutureTask = this.taskScheduler.schedule( + new ReaperTask(), new IntervalTrigger(this.reaperInterval, TimeUnit.MILLISECONDS)); + } + + public void stop() { + if (this.isRunning()) { + this.reaperFutureTask.cancel(true); + } } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java index 5825091ca4..05acc20710 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java @@ -18,13 +18,11 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.config.BeanDefinitionHolder; -import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.util.xml.DomUtils; @@ -40,8 +38,6 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio protected static final String METHOD_ATTRIBUTE = "method"; - protected static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; - protected static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; private static final String POLLER_ELEMENT = "poller"; @@ -51,20 +47,9 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio private static final String ERROR_HANDLER_ATTRIBUTE = "error-handler"; - /** - * Subclasses may override this method to determine whether the endpoint - * type should create an adapter. If so, the "ref" attribute will be - * required, and the "method" attribute will typically be used as well. - * - *

The default is true. - */ - protected boolean shouldCreateAdapter(Element element) { - return true; - } - @Override protected final Class getBeanClass(Element element) { - return this.getEndpointClass(); + return ConsumerEndpointFactoryBean.class; } @Override @@ -77,22 +62,42 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio return true; } - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - if (this.shouldCreateAdapter(element)) { - String ref = element.getAttribute(REF_ATTRIBUTE); - Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required."); - if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { - String method = element.getAttribute(METHOD_ATTRIBUTE); - String adapterBeanName = this.parseAdapter(ref, method, element, parserContext); - builder.addConstructorArgReference(adapterBeanName); - } - else { - builder.addConstructorArgReference(ref); - } + /** + * Parse the MessageConsumer. + */ + protected abstract BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext); + + protected String getInputChannelAttributeName() { + return "input-channel"; + } + + protected String parseAdapter(Element element, ParserContext parserContext, Class adapterClass) { + String ref = element.getAttribute(REF_ATTRIBUTE); + Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required."); + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(adapterClass); + String method = element.getAttribute(METHOD_ATTRIBUTE); + builder.addConstructorArgReference(ref); + builder.addConstructorArgValue(method); + return BeanDefinitionReaderUtils.registerWithGeneratedName( + builder.getBeanDefinition(), parserContext.getRegistry()); } - String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE); - Assert.hasText(inputChannel, "the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required"); + return ref; + } + + @Override + protected final void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + BeanDefinitionBuilder consumerBuilder = this.parseConsumer(element, parserContext); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, OUTPUT_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, SELECTOR_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, ERROR_HANDLER_ATTRIBUTE); + String consumerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( + consumerBuilder.getBeanDefinition(), parserContext.getRegistry()); + builder.addConstructorArgReference(consumerBeanName); + String inputChannelAttributeName = this.getInputChannelAttributeName(); + String inputChannelName = element.getAttribute(inputChannelAttributeName); + Assert.hasText(inputChannelName, "the '" + inputChannelAttributeName + "' attribute is required"); + builder.addPropertyValue("inputChannelName", inputChannelName); Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); if (pollerElement != null) { IntegrationNamespaceUtils.configureTrigger(pollerElement, builder); @@ -102,41 +107,13 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio } IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, pollerElement, "task-executor"); } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, SELECTOR_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); this.postProcess(element, parserContext, builder); } - private String parseAdapter(String ref, String method, Element element, ParserContext parserContext) { - Class adapterClass = this.getMethodInvokingAdapterClass(); - Assert.state(adapterClass != null, - "Parser needs to create an adapter but 'getMethodInvokingAdapterClass()' returned null."); - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(adapterClass); - builder.addConstructorArgReference(ref); - builder.addConstructorArgValue(method); - String adapterBeanName = BeanDefinitionReaderUtils.generateBeanName( - builder.getBeanDefinition(), parserContext.getRegistry()); - BeanDefinitionHolder holder = new BeanDefinitionHolder(builder.getBeanDefinition(), adapterBeanName); - parserContext.registerBeanComponent(new BeanComponentDefinition(holder)); - return adapterBeanName; - } - /** * Subclasses may implement this method to provide additional configuration. */ protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { } - /** - * Subclasses must override this if the adapted object is created from - * the "ref" and "method" attribute values. - */ - protected Class getMethodInvokingAdapterClass() { - return null; - } - - protected abstract Class getEndpointClass(); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java index fa37eb762f..2796403a5f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractOutboundChannelAdapterParser.java @@ -22,7 +22,7 @@ import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.config.ConsumerEndpointFactoryBean; import org.springframework.util.Assert; import org.springframework.util.xml.DomUtils; @@ -36,20 +36,18 @@ public abstract class AbstractOutboundChannelAdapterParser extends AbstractChann @Override protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) { Element pollerElement = DomUtils.getChildElementByTagName(element, "poller"); - BeanDefinitionBuilder adapterBuilder = null; - adapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(OutboundChannelAdapter.class); - String consumerBeanName = this.parseAndRegisterConsumer(element, parserContext); - adapterBuilder.addConstructorArgReference(consumerBeanName); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConsumerEndpointFactoryBean.class); + builder.addConstructorArgReference(this.parseAndRegisterConsumer(element, parserContext)); if (pollerElement != null) { Assert.hasText(channelName, "outbound channel adapter with a 'poller' requires a 'channel' to poll"); - IntegrationNamespaceUtils.configureTrigger(pollerElement, adapterBuilder); + IntegrationNamespaceUtils.configureTrigger(pollerElement, builder); Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); if (txElement != null) { - IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder); + IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder); } } - adapterBuilder.addPropertyReference("inputChannel", channelName); - return adapterBuilder.getBeanDefinition(); + builder.addPropertyValue("inputChannelName", channelName); + return builder.getBeanDefinition(); } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java index ac9d530fd9..f9ec546bd7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java @@ -22,9 +22,8 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aggregator.AggregatorEndpoint; -import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.integration.aggregator.CompletionStrategyAdapter; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.util.StringUtils; /** @@ -36,40 +35,35 @@ import org.springframework.util.StringUtils; */ public class AggregatorParser extends AbstractEndpointParser { - public static final String COMPLETION_STRATEGY_REF_ATTRIBUTE = "completion-strategy"; + private static final String COMPLETION_STRATEGY_REF_ATTRIBUTE = "completion-strategy"; - public static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method"; + private static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method"; - public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; + private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; - public static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; + private static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; - public static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; + private static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; - public static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; + private static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; - public static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; + private static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; - public static final String TIMEOUT_ATTRIBUTE = "timeout"; + private static final String TIMEOUT_ATTRIBUTE = "timeout"; private static final String COMPLETION_STRATEGY_PROPERTY = "completionStrategy"; - public static final String AGGREGATOR_ELEMENT = "aggregator"; - @Override - protected Class getEndpointClass() { - return AggregatorEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingAggregator.class; - } - - @Override - protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { - super.doParse(element, parserContext, builder); + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(AggregatorEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingAggregator.class)); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); final String completionStrategyRef = element.getAttribute(COMPLETION_STRATEGY_REF_ATTRIBUTE); final String completionStrategyMethod = element.getAttribute(COMPLETION_STRATEGY_METHOD_ATTRIBUTE); if (StringUtils.hasText(completionStrategyRef)) { @@ -82,12 +76,7 @@ public class AggregatorParser extends AbstractEndpointParser { builder.addPropertyReference(COMPLETION_STRATEGY_PROPERTY, completionStrategyRef); } } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); + return builder; } private String createCompletionStrategyAdapter(String ref, String method, ParserContext parserContext) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java index e8659c2a04..7ed0aa5efd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/FilterParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.filter.FilterEndpoint; import org.springframework.integration.filter.MethodInvokingSelector; @@ -28,13 +31,10 @@ import org.springframework.integration.filter.MethodInvokingSelector; public class FilterParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return FilterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingSelector.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(FilterEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingSelector.class)); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java index b6f2e529ba..45a9764fa5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java @@ -19,7 +19,7 @@ package org.springframework.integration.config; import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.aggregator.ResequencerEndpoint; /** @@ -27,33 +27,34 @@ import org.springframework.integration.aggregator.ResequencerEndpoint; * * @author Marius Bogoevici */ -public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { +public class ResequencerParser extends AbstractEndpointParser { - public static final String INPUT_CHANNEL_ATTRIBUTE = "input-channel"; + private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; - public static final String OUTPUT_CHANNEL_ATTRIBUTE = "output-channel"; + private static final String SEND_TIMEOUT_ATTRIBUTE = "send-timeout"; - public static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; + private static final String RELEASE_PARTIAL_SEQUENCES = "release-partial-sequences"; + + private static final String SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE = "send-partial-result-on-timeout"; + + private static final String REAPER_INTERVAL_ATTRIBUTE = "reaper-interval"; + + private static final String TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE = "tracked-correlation-id-capacity"; + + private static final String TIMEOUT_ATTRIBUTE = "timeout"; @Override - protected Class getBeanClass(Element element) { - return ResequencerEndpoint.class; - } - - @Override - protected boolean isEligibleAttribute(String attributeName) { - return !INPUT_CHANNEL_ATTRIBUTE.equals(attributeName) - &&!OUTPUT_CHANNEL_ATTRIBUTE.equals(attributeName) - && !DISCARD_CHANNEL_ATTRIBUTE.equals(attributeName) - && super.isEligibleAttribute(attributeName); - } - - @Override - protected void postProcess(BeanDefinitionBuilder builder, Element element) { - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ResequencerEndpoint.class); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, RELEASE_PARTIAL_SEQUENCES); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java index 8aacf80681..11ee4ea885 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/RouterParser.java @@ -20,7 +20,6 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.router.MethodInvokingChannelResolver; import org.springframework.integration.router.RouterEndpoint; @@ -32,18 +31,13 @@ import org.springframework.integration.router.RouterEndpoint; public class RouterParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return RouterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingChannelResolver.class; - } - - @Override - protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + String adapterBeanName = this.parseAdapter(element, parserContext, MethodInvokingChannelResolver.class); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(RouterEndpoint.class); + builder.addConstructorArgReference(adapterBeanName); + builder.addPropertyReference("channelRegistry", MessageBusParser.MESSAGE_BUS_BEAN_NAME); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "default-output-channel"); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java index 9bd8add796..6e87c30d1b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ServiceActivatorParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.message.MessageMappingMethodInvoker; @@ -28,13 +31,11 @@ import org.springframework.integration.message.MessageMappingMethodInvoker; public class ServiceActivatorParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return ServiceActivatorEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MessageMappingMethodInvoker.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ServiceActivatorEndpoint.class); + String constructorArg = this.parseAdapter(element, parserContext, MessageMappingMethodInvoker.class); + builder.addConstructorArgReference(constructorArg); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java index f4abd0e47d..11a1adcde6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SplitterParser.java @@ -18,7 +18,8 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.splitter.MethodInvokingSplitter; import org.springframework.integration.splitter.SplitterEndpoint; @@ -30,18 +31,13 @@ import org.springframework.integration.splitter.SplitterEndpoint; public class SplitterParser extends AbstractEndpointParser { @Override - protected boolean shouldCreateAdapter(Element element) { - return element.hasAttribute("ref"); - } - - @Override - protected Class getEndpointClass() { - return SplitterEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingSplitter.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(SplitterEndpoint.class); + if (element.hasAttribute("ref")) { + String adapterBeanName = this.parseAdapter(element, parserContext, MethodInvokingSplitter.class); + builder.addConstructorArgReference(adapterBeanName); + } + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java index bafeafba18..65fe7f5408 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransformerParser.java @@ -16,7 +16,10 @@ package org.springframework.integration.config; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.transformer.MethodInvokingTransformer; import org.springframework.integration.transformer.TransformerEndpoint; @@ -28,13 +31,10 @@ import org.springframework.integration.transformer.TransformerEndpoint; public class TransformerParser extends AbstractEndpointParser { @Override - protected Class getEndpointClass() { - return TransformerEndpoint.class; - } - - @Override - protected Class getMethodInvokingAdapterClass() { - return MethodInvokingTransformer.class; + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransformerEndpoint.class); + builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingTransformer.class)); + return builder; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index a890d865cd..7cc214c57b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -19,15 +19,21 @@ package org.springframework.integration.config.annotation; import java.lang.annotation.Annotation; import java.lang.reflect.Method; +import org.springframework.beans.factory.InitializingBean; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.integration.annotation.Poller; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -58,55 +64,73 @@ public abstract class AbstractMethodAnnotationPostProcessor pollerFuture; - - private volatile TaskExecutor taskExecutor; - - private volatile int maxMessagesPerPoll = -1; + protected final Log logger = LogFactory.getLog(this.getClass()); private volatile ErrorHandler errorHandler; - private volatile boolean initialized; - - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - - - public void setInputChannel(MessageChannel inputChannel) { - this.inputChannel = inputChannel; - } - - public void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public void setTaskExecutor(TaskExecutor taskExecutor) { - this.taskExecutor = taskExecutor; - } /** * Provide an error handler for any Exceptions that occur - * upon invocation of this endpoint. If none is provided, + * upon invocation of this consumer. If none is provided, * the Exception messages will be logged (at warn level), * and the Exception rethrown. */ @@ -82,94 +48,11 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint this.errorHandler = errorHandler; } - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - if (this.poller != null) { - this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); - } - } - - public final boolean isRunning() { - return this.running; - } - - @Override - protected void initialize() throws Exception { - synchronized (this.lifecycleMonitor) { - if (this.inputChannel instanceof PollableChannel && this.poller == null) { - this.poller = new ChannelPoller((PollableChannel) this.inputChannel, this.trigger); - this.poller.setMaxMessagesPerPoll(this.maxMessagesPerPoll); - this.configureTransactionSettingsForPoller(this.poller); - if (this.taskExecutor != null) { - this.poller.setTaskExecutor(this.taskExecutor); - } - this.poller.setConsumer(this); - } - this.initialized = true; - } - } - - public final void start() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - return; - } - if (!this.initialized) { - this.afterPropertiesSet(); - } - Assert.notNull(this.inputChannel, "failed to start endpoint, inputChannel is required"); - if (this.inputChannel instanceof SubscribableChannel) { - ((SubscribableChannel) inputChannel).subscribe(this); - } - else if (this.inputChannel instanceof PollableChannel) { - Assert.notNull(this.getTaskScheduler(), - "failed to start endpoint, no taskScheduler available"); - this.pollerFuture = this.getTaskScheduler().schedule(this.poller, this.poller.getTrigger()); - } - onStart(); - this.running = true; - } - } - - public final void stop() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - return; - } - if (this.inputChannel instanceof SubscribableChannel) { - ((SubscribableChannel) inputChannel).unsubscribe(this); - } - else if (this.pollerFuture != null) { - this.pollerFuture.cancel(true); - } - onStop(); - this.running = false; - } - } - - /** - * Subclasses might override this to supply their own start code (e.g. if they start threads - * on their own). This method will be called within the lifecycleMonitor. - */ - protected void onStart() { - - } - - /** - * Subclasses might override this to supply their own stop code (e.g. if they stop threads - * on their own).This method will be called within the lifecycleMonitor. - * - */ - protected void onStop() { - - } - public final void onMessage(Message message) { - if (message == null || message.getPayload() == null) { - throw new IllegalArgumentException("Message and its payload must not be null"); - } + Assert.notNull(message == null, "Message must not be null"); + Assert.notNull(message.getPayload(), "Message payload must not be null"); if (this.logger.isDebugEnabled()) { - this.logger.debug("endpoint '" + this + "' processing message: " + message); + this.logger.debug("consumer '" + this + "' processing message: " + message); } try { this.onMessageInternal(message); @@ -180,7 +63,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint } else { this.handleException(new MessageHandlingException(message, - "failure occurred in endpoint '" + this.toString() + "'", e)); + "failure occurred in consumer '" + this.toString() + "'", e)); } } } @@ -188,7 +71,7 @@ public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint protected void handleException(MessagingException exception) { if (this.errorHandler == null) { if (this.logger.isWarnEnabled()) { - this.logger.warn("exception occurred in endpoint '" + this + "'", exception); + this.logger.warn("exception occurred in consumer '" + this + "'", exception); } throw exception; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java index d4813025ff..7eea6b5dd9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageHandlingEndpoint.java @@ -22,6 +22,7 @@ import java.util.List; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.message.CompositeMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -30,6 +31,7 @@ import org.springframework.integration.message.MessageHeaders; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.selector.MessageSelector; +import org.springframework.util.Assert; /** * @author Mark Fisher @@ -44,6 +46,8 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon private volatile boolean requiresReply = false; + private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); + public void setOutputChannel(MessageChannel outputChannel) { this.outputChannel = outputChannel; @@ -74,7 +78,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon Object result = this.handle(message); if (result == null) { if (this.requiresReply) { - throw new MessageHandlingException(message, "endpoint '" + this + throw new MessageHandlingException(message, "consumer '" + this + "' requires a reply, but no reply was received"); } return; @@ -105,7 +109,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon protected boolean supports(Message message) { if (this.selector != null && !this.selector.accept(message)) { if (logger.isDebugEnabled()) { - logger.debug("selector for endpoint '" + this + "' rejected message: " + message); + logger.debug("selector for consumer '" + this + "' rejected message: " + message); } return false; } @@ -117,7 +121,7 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon } private boolean sendReplyMessage(Message replyMessage, MessageChannel replyChannel) { - return this.getChannelTemplate().send(replyMessage, replyChannel); + return this.channelTemplate.send(replyMessage, replyChannel); } private Message buildReplyMessage(Object result, MessageHeaders requestHeaders) { @@ -153,9 +157,9 @@ public abstract class AbstractMessageHandlingEndpoint extends AbstractMessageCon replyChannel = (MessageChannel) returnAddress; } else if (returnAddress instanceof String) { - if (this.channelRegistry != null) { - replyChannel = this.channelRegistry.lookupChannel((String) returnAddress); - } + Assert.state(this.channelRegistry != null, + "ChannelRegistry is required for resolving a reply channel by name"); + replyChannel = this.channelRegistry.lookupChannel((String) returnAddress); } else { throw new MessagingException("expected a MessageChannel or String for 'returnAddress', but type is [" diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index bb9cb711fd..185af50eff 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -140,6 +140,9 @@ public abstract class AbstractPollingEndpoint implements MessageEndpoint, TaskSc if (!this.initialized) { this.afterPropertiesSet(); } + if (this.isRunning()) { + return; + } Assert.state(this.taskScheduler != null, "unable to start polling, no taskScheduler available"); this.runningTask = this.taskScheduler.schedule(new Poller(), this.trigger); @@ -176,7 +179,7 @@ public abstract class AbstractPollingEndpoint implements MessageEndpoint, TaskSc private void poll() { int count = 0; - while (maxMessagesPerPoll < 0 || count < maxMessagesPerPoll) { + while (maxMessagesPerPoll <= 0 || count < maxMessagesPerPoll) { if (!innerPoll()) { break; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java index 7e7ab5fea0..29f9268232 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java @@ -31,7 +31,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint { +public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint implements InitializingBean { private final MethodResolver methodResolver = new DefaultMethodResolver(ServiceActivator.class); @@ -56,9 +56,7 @@ public class ServiceActivatorEndpoint extends AbstractMessageHandlingEndpoint { } - @Override - protected void initialize() throws Exception { - super.initialize(); + public void afterPropertiesSet() throws Exception { if (this.invoker instanceof InitializingBean) { ((InitializingBean) this.invoker).afterPropertiesSet(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java index 6c25d1b9a6..52f0ece4a1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/config/ConsumerEndpointFactoryBean.java @@ -22,6 +22,8 @@ import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; @@ -39,7 +41,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, InitializingBean { +public class ConsumerEndpointFactoryBean implements FactoryBean, ChannelRegistryAware, BeanFactoryAware, InitializingBean { private final MessageConsumer consumer; @@ -76,6 +78,16 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar this.inputChannelName = inputChannelName; } + public void setChannelRegistry(ChannelRegistry channelRegistry) { + if (this.consumer instanceof ChannelRegistryAware) { + ((ChannelRegistryAware) this.consumer).setChannelRegistry(channelRegistry); + } + } + + public void setTrigger(Trigger trigger) { + this.trigger = trigger; + } + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { this.maxMessagesPerPoll = maxMessagesPerPoll; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java index 966f0b81ce..73f7a6522b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java @@ -21,8 +21,14 @@ import org.springframework.integration.bus.MessageBusAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.channel.SubscribableChannel; +import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.MessagingGateway; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.util.Assert; @@ -42,7 +48,7 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); - private volatile ReplyMessageCorrelator replyMessageCorrelator; + private volatile MessageEndpoint replyMessageCorrelator; private volatile MessageBus messageBus; @@ -145,10 +151,23 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess return; } Assert.state(this.messageBus != null, "No MessageBus available. Cannot register ReplyMessageCorrelator."); - ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(); - correlator.setBeanName("internal.correlator." + this); - correlator.setInputChannel(this.replyChannel); - correlator.afterPropertiesSet(); + MessageEndpoint correlator = null; + MessageConsumer consumer = new AbstractMessageHandlingEndpoint() { + @Override + protected Object handle(Message message) { + return message; + } + }; + if (this.replyChannel instanceof SubscribableChannel) { + correlator = new SubscribingConsumerEndpoint( + consumer, (SubscribableChannel) this.replyChannel); + } + else if (this.replyChannel instanceof PollableChannel) { + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint( + consumer, (PollableChannel) this.replyChannel); + endpoint.afterPropertiesSet(); + correlator = endpoint; + } this.messageBus.registerEndpoint(correlator); this.replyMessageCorrelator = correlator; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java index 56521ac4cb..b20e4c378c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java @@ -21,6 +21,7 @@ import java.util.Collection; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; @@ -37,6 +38,8 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements private volatile boolean resolutionRequired; + private final MessageChannelTemplate channelTemplate = new MessageChannelTemplate(); + public RouterEndpoint(ChannelResolver channelResolver) { Assert.notNull(channelResolver, "ChannelResolver must not be null"); @@ -59,7 +62,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements * default, there is no timeout, meaning the send will block indefinitely. */ public void setTimeout(long timeout) { - this.getChannelTemplate().setSendTimeout(timeout); + this.channelTemplate.setSendTimeout(timeout); } /** @@ -79,7 +82,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements if (results != null) { for (MessageChannel channel : results) { if (channel != null) { - if (this.getChannelTemplate().send(message, channel)) { + if (this.channelTemplate.send(message, channel)) { sent = true; } } @@ -87,7 +90,7 @@ public class RouterEndpoint extends AbstractMessageConsumingEndpoint implements } if (!sent) { if (this.defaultOutputChannel != null) { - sent = this.getChannelTemplate().send(message, this.defaultOutputChannel); + sent = this.channelTemplate.send(message, this.defaultOutputChannel); } else if (this.resolutionRequired) { throw new MessageDeliveryException(message, diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java index 559258dcb9..085cf60afd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/config/AbstractTransformerParser.java @@ -22,7 +22,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.config.AbstractEndpointParser; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.transformer.Transformer; import org.springframework.integration.transformer.TransformerEndpoint; @@ -32,23 +31,15 @@ import org.springframework.integration.transformer.TransformerEndpoint; public abstract class AbstractTransformerParser extends AbstractEndpointParser { @Override - protected boolean shouldCreateAdapter(Element element) { - return false; - } - - @Override - protected Class getEndpointClass() { - return TransformerEndpoint.class; - } - - @Override - protected void postProcess(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransformerEndpoint.class); BeanDefinitionBuilder transformerBuilder = BeanDefinitionBuilder.genericBeanDefinition(this.getTransformerClass()); this.parseTransformer(element, parserContext, transformerBuilder); String transformerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( transformerBuilder.getBeanDefinition(), parserContext.getRegistry()); builder.addConstructorArgReference(transformerBeanName); + return builder; } protected abstract Class getTransformerClass(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java index 9396d85beb..9db00b2cb4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java @@ -52,7 +52,7 @@ public class AggregatorEndpointTests { this.aggregator = new AggregatorEndpoint(new TestAggregator()); this.aggregator.setTaskScheduler(this.taskScheduler); this.taskScheduler.start(); - this.aggregator.onStart(); + this.aggregator.start(); } @Test @@ -325,7 +325,7 @@ public class AggregatorEndpointTests { @After public void stopTaskScheduler() { this.taskScheduler.stop(); - this.aggregator.onStop(); + this.aggregator.stop(); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java index 43f186a1ca..624d1e07ac 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java @@ -46,7 +46,7 @@ public class ResequencerEndpointTests { this.taskScheduler = Schedulers.createDefaultTaskScheduler(10); this.resequencer.setTaskScheduler(taskScheduler); taskScheduler.start(); - this.resequencer.onStart(); + this.resequencer.start(); } @Test @@ -149,7 +149,7 @@ public class ResequencerEndpointTests { @After public void stopTaskScheduler() { - this.resequencer.onStop(); + this.resequencer.stop(); this.taskScheduler.stop(); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 3cf7dc42bd..0e065674b8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -36,7 +36,9 @@ import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.ErrorMessage; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -62,17 +64,18 @@ public class DefaultMessageBusTests { Message message = MessageBuilder.withPayload("test") .setReturnAddress("targetChannel").build(); sourceChannel.send(message); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return message; } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(sourceChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, sourceChannel); + endpoint.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); context.refresh(); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); + consumer.setChannelRegistry(bus); bus.start(); Message result = targetChannel.receive(3000); assertEquals("test", result.getPayload()); @@ -116,12 +119,12 @@ public class DefaultMessageBusTests { QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); - AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return MessageBuilder.fromMessage(message).build(); } }; - AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return MessageBuilder.fromMessage(message).build(); } @@ -132,12 +135,12 @@ public class DefaultMessageBusTests { context.getBeanFactory().registerSingleton("input", inputChannel); context.getBeanFactory().registerSingleton("output1", outputChannel1); context.getBeanFactory().registerSingleton("output2", outputChannel2); - endpoint1.setBeanName("testEndpoint1"); - endpoint1.setInputChannel(inputChannel); - endpoint1.setOutputChannel(outputChannel1); - endpoint2.setBeanName("testEndpoint2"); - endpoint2.setInputChannel(inputChannel); - endpoint2.setOutputChannel(outputChannel2); + consumer1.setOutputChannel(outputChannel1); + consumer2.setOutputChannel(outputChannel2); + PollingConsumerEndpoint endpoint1 = new PollingConsumerEndpoint(consumer1, inputChannel); + endpoint1.afterPropertiesSet(); + PollingConsumerEndpoint endpoint2 = new PollingConsumerEndpoint(consumer2, inputChannel); + endpoint2.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); DefaultMessageBus bus = new DefaultMessageBus(); @@ -157,14 +160,14 @@ public class DefaultMessageBusTests { QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); final CountDownLatch latch = new CountDownLatch(2); - AbstractMessageHandlingEndpoint endpoint1 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer1 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { Message reply = MessageBuilder.fromMessage(message).build(); latch.countDown(); return reply; } }; - AbstractMessageHandlingEndpoint endpoint2 = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer2 = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { Message reply = MessageBuilder.fromMessage(message).build(); latch.countDown(); @@ -177,12 +180,10 @@ public class DefaultMessageBusTests { context.getBeanFactory().registerSingleton("input", inputChannel); context.getBeanFactory().registerSingleton("output1", outputChannel1); context.getBeanFactory().registerSingleton("output2", outputChannel2); - endpoint1.setBeanName("testEndpoint1"); - endpoint1.setInputChannel(inputChannel); - endpoint1.setOutputChannel(outputChannel1); - endpoint2.setBeanName("testEndpoint2"); - endpoint2.setInputChannel(inputChannel); - endpoint2.setOutputChannel(outputChannel2); + consumer1.setOutputChannel(outputChannel1); + consumer2.setOutputChannel(outputChannel2); + SubscribingConsumerEndpoint endpoint1 = new SubscribingConsumerEndpoint(consumer1, inputChannel); + SubscribingConsumerEndpoint endpoint2 = new SubscribingConsumerEndpoint(consumer2, inputChannel); context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); DefaultMessageBus bus = new DefaultMessageBus(); @@ -246,14 +247,14 @@ public class DefaultMessageBusTests { errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); context.getBeanFactory().registerSingleton(ChannelRegistry.ERROR_CHANNEL_NAME, errorChannel); final CountDownLatch latch = new CountDownLatch(1); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { latch.countDown(); return null; } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(errorChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, errorChannel); + endpoint.afterPropertiesSet(); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 5d0c1054b5..f793e53567 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -31,11 +31,10 @@ import org.springframework.integration.channel.ThreadLocalChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageMappingMethodInvoker; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.util.MethodInvoker; /** * @author Mark Fisher @@ -61,11 +60,9 @@ public class DirectChannelSubscriptionTests { @Test public void testSendAndReceiveForRegisteredEndpoint() { GenericApplicationContext context = new GenericApplicationContext(); - MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle"); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker); - endpoint.setInputChannel(sourceChannel); - endpoint.setOutputChannel(targetChannel); - endpoint.setBeanName("testEndpoint"); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "handle"); + serviceActivator.setOutputChannel(targetChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); bus.setApplicationContext(context); bus.start(); @@ -96,14 +93,13 @@ public class DirectChannelSubscriptionTests { QueueChannel errorChannel = new QueueChannel(); errorChannel.setBeanName(ChannelRegistry.ERROR_CHANNEL_NAME); bus.registerChannel(errorChannel); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { throw new RuntimeException("intentional test failure"); } }; - endpoint.setInputChannel(sourceChannel); - endpoint.setOutputChannel(targetChannel); - endpoint.setBeanName("testEndpoint"); + consumer.setOutputChannel(targetChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel); bus.registerEndpoint(endpoint); bus.start(); this.sourceChannel.send(new StringMessage("foo")); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml index c7468ea49d..a2aee37bee 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml @@ -10,9 +10,13 @@ - + + + + + + - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java index 33362f17fc..1987d78303 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.endpoint.AbstractMessageHandlingEndpoint; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; @@ -46,13 +47,13 @@ public class MessageChannelTemplateTests { public void setUp() { this.requestChannel = new QueueChannel(); this.requestChannel.setBeanName("requestChannel"); - AbstractMessageHandlingEndpoint endpoint = new AbstractMessageHandlingEndpoint() { + AbstractMessageHandlingEndpoint consumer = new AbstractMessageHandlingEndpoint() { public Message handle(Message message) { return new StringMessage(message.getPayload().toString().toUpperCase()); } }; - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(requestChannel); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(consumer, requestChannel); + endpoint.afterPropertiesSet(); GenericApplicationContext context = new GenericApplicationContext(); context.getBeanFactory().registerSingleton("requestChannel", requestChannel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 861795a16a..56253a0957 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -28,11 +28,11 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.AggregatorEndpoint; import org.springframework.integration.aggregator.CompletionStrategy; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.util.MethodInvoker; @@ -53,15 +53,14 @@ public class AggregatorParserTests { @Test public void testAggregation() { - AggregatorEndpoint endpoint = - (AggregatorEndpoint) context.getBean("aggregatorWithReference"); + MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput"); TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 1, null)); outboundMessages.add(createMessage("789", "id1", 3, 3, null)); outboundMessages.add(createMessage("456", "id1", 3, 2, null)); for (Message message : outboundMessages) { - endpoint.onMessage(message); + input.send(message); } Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean .getAggregatedMessages().size()); @@ -72,13 +71,14 @@ public class AggregatorParserTests { @Test public void testPropertyAssignment() throws Exception { - AggregatorEndpoint endpoint = - (AggregatorEndpoint) context.getBean("completelyDefinedAggregator"); + SubscribingConsumerEndpoint endpoint = + (SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator"); TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean"); CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + DirectFieldAccessor accessor = new DirectFieldAccessor( + new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance", testAggregator, accessor.getPropertyValue("aggregator")); Assert.assertEquals( @@ -105,13 +105,13 @@ public class AggregatorParserTests { @Test public void testSimpleJavaBeanAggregator() { List> outboundMessages = new ArrayList>(); - AggregatorEndpoint addingAggregator = - (AggregatorEndpoint) context.getBean("aggregatorWithReferenceAndMethod"); + MessageChannel input = + (MessageChannel) context.getBean("aggregatorWithReferenceAndMethodInput"); outboundMessages.add(createMessage(1l, "id1", 3, 1, null)); outboundMessages.add(createMessage(2l, "id1", 3, 3, null)); outboundMessages.add(createMessage(3l, "id1", 3, 2, null)); for (Message message : outboundMessages) { - addingAggregator.onMessage(message); + input.send(message); } PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message response = outputChannel.receive(); @@ -130,23 +130,24 @@ public class AggregatorParserTests { } @Test - public void testAggregatorWithPojoCompletionStrategy(){ - AggregatorEndpoint aggregatorWithPojoCompletionStrategy = - (AggregatorEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy"); - CompletionStrategy completionStrategy = (CompletionStrategy) - new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy"); + public void testAggregatorWithPojoCompletionStrategy() { + MessageChannel input = (MessageChannel) context.getBean("aggregatorWithPojoCompletionStrategyInput"); + SubscribingConsumerEndpoint endpoint = + (SubscribingConsumerEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy"); + CompletionStrategy completionStrategy = (CompletionStrategy) new DirectFieldAccessor( + new DirectFieldAccessor(endpoint).getPropertyValue("consumer")).getPropertyValue("completionStrategy"); Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter); DirectFieldAccessor completionStrategyAccessor = new DirectFieldAccessor(completionStrategy); MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker"); Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy); Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness")); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(1l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(2l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(3l, "id1", 0 , 0, null)); + input.send(createMessage(1l, "id1", 0 , 0, null)); + input.send(createMessage(2l, "id1", 0 , 0, null)); + input.send(createMessage(3l, "id1", 0 , 0, null)); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message reply = outputChannel.receive(0); Assert.assertNull(reply); - aggregatorWithPojoCompletionStrategy.onMessage(createMessage(5l, "id1", 0 , 0, null)); + input.send(createMessage(5l, "id1", 0 , 0, null)); reply = outputChannel.receive(0); Assert.assertNotNull(reply); Assert.assertEquals(11l, reply.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java index 6e8b3ad814..ff574e0c52 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java @@ -28,7 +28,7 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; -import org.springframework.integration.endpoint.OutboundChannelAdapter; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; import org.springframework.test.context.ContextConfiguration; @@ -50,7 +50,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertNotNull(bus.lookupChannel(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); - assertTrue(adapter instanceof OutboundChannelAdapter); + assertTrue(adapter instanceof SubscribingConsumerEndpoint); TestConsumer consumer = (TestConsumer) this.applicationContext.getBean("consumer"); assertNull(consumer.getLastMessage()); Message message = new StringMessage("test"); @@ -70,7 +70,7 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertNotNull(bus.lookupChannel(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); assertNotNull(adapter); - assertTrue(adapter instanceof OutboundChannelAdapter); + assertTrue(adapter instanceof SubscribingConsumerEndpoint); TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); assertNull(testBean.getMessage()); Message message = new StringMessage("consumer test"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index b32be1fd3b..8d9197a744 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -30,7 +30,6 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; @@ -56,11 +55,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorAccepts() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); + MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); QueueChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.withPayload("test") .setReturnAddress(replyChannel).build(); - endpoint.onMessage(message); + inputChannel.send(message); Message reply = replyChannel.receive(500); assertNotNull(reply); assertEquals("foo", reply.getPayload()); @@ -70,11 +69,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorRejects() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); + MessageChannel inputChannel = (MessageChannel) context.getBean("testChannel"); MessageChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.withPayload(123) .setReturnAddress(replyChannel).build(); - endpoint.onMessage(message); + inputChannel.send(message); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 9a7bbb3a64..6e1fb26847 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -26,9 +26,9 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.ResequencerEndpoint; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -70,8 +70,8 @@ public class ResequencerParserTests { @Test public void testDefaultResequencerProperties() { - ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("defaultResequencer"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("defaultResequencer"); + DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertNull(accessor.getPropertyValue("outputChannel")); Assert.assertNull(accessor.getPropertyValue("discardChannel")); Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value", @@ -92,10 +92,10 @@ public class ResequencerParserTests { @Test public void testPropertyAssignment() throws Exception { - ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("completelyDefinedResequencer"); + SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedResequencer"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + DirectFieldAccessor accessor = new DirectFieldAccessor(new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel", outputChannel, accessor.getPropertyValue("outputChannel")); Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel", diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 815eaf2417..973e9bc8d2 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -9,7 +9,6 @@ - @@ -17,10 +16,12 @@ - + + + + + - - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java index abb2112aee..35329a5296 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/CorrelationIdTests.java @@ -43,9 +43,9 @@ public class CorrelationIdTests { .setCorrelationId(correlationId).build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -57,9 +57,9 @@ public class CorrelationIdTests { Message message = MessageBuilder.withPayload("test").build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -72,9 +72,9 @@ public class CorrelationIdTests { .setCorrelationId("correlationId").build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -89,9 +89,9 @@ public class CorrelationIdTests { .setCorrelationId(correlationId).build(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -103,9 +103,9 @@ public class CorrelationIdTests { Message message = new StringMessage("test"); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setInputChannel(inputChannel); - endpoint.setOutputChannel(outputChannel); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); + serviceActivator.setOutputChannel(outputChannel); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, inputChannel); endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java index ac1ea5f2e1..b0d2a895ec 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorMethodResolutionTests.java @@ -33,11 +33,10 @@ public class ServiceActivatorMethodResolutionTests { @Test public void singleAnnotationMatches() { SingleAnnotationTestBean testBean = new SingleAnnotationTestBean(); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean); QueueChannel outputChannel = new QueueChannel(); - endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); - endpoint.onMessage(new StringMessage("foo")); + serviceActivator.setOutputChannel(outputChannel); + serviceActivator.onMessage(new StringMessage("foo")); Message result = outputChannel.receive(0); assertEquals("FOO", result.getPayload()); } @@ -51,11 +50,10 @@ public class ServiceActivatorMethodResolutionTests { @Test public void singlePublicMethodMatches() { SinglePublicMethodTestBean testBean = new SinglePublicMethodTestBean(); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(testBean); + ServiceActivatorEndpoint serviceActivator = new ServiceActivatorEndpoint(testBean); QueueChannel outputChannel = new QueueChannel(); - endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); - endpoint.onMessage(new StringMessage("foo")); + serviceActivator.setOutputChannel(outputChannel); + serviceActivator.onMessage(new StringMessage("foo")); Message result = outputChannel.receive(0); assertEquals("FOO", result.getPayload()); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java index bfe322cf3f..35698d5647 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/filter/FilterEndpointTests.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.filter.FilterEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -65,9 +66,9 @@ public class FilterEndpointTests { return true; } }); - filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.start(); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel); + endpoint.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -84,9 +85,9 @@ public class FilterEndpointTests { return false; } }); - filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.start(); + SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(filter, inputChannel); + endpoint.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); assertNull(outputChannel.receive(0)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java index 0e3c2ce9b2..e156fc4169 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.PollingConsumerEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; /** @@ -82,9 +83,8 @@ public class MethodInvokingConsumerTests { Message message = new GenericMessage("testing"); channel.send(message); assertNull(queue.poll()); - ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(consumer); - endpoint.setBeanName("testEndpoint"); - endpoint.setInputChannel(channel); + ServiceActivatorEndpoint serivceActivator = new ServiceActivatorEndpoint(consumer); + PollingConsumerEndpoint endpoint = new PollingConsumerEndpoint(serivceActivator, channel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); DefaultMessageBus bus = new DefaultMessageBus(); bus.setApplicationContext(context); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml index 4807ae144e..eb4c9c9193 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/splitter/SplitterIntegrationTests-context.xml @@ -14,12 +14,15 @@ - - + +