diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java index d3aa8d53d7..3227929228 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java @@ -18,6 +18,7 @@ package org.springframework.integration.config.xml; import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; @@ -29,6 +30,7 @@ import org.springframework.util.StringUtils; * * @author Marius Bogoevici * @author Mark Fisher + * @author Oleg Zhurakousky */ public class AggregatorParser extends AbstractConsumerEndpointParser { @@ -59,21 +61,30 @@ public class AggregatorParser extends AbstractConsumerEndpointParser { @Override protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { - BeanDefinitionBuilder builder; + BeanDefinition innerHandlerDefinition = this.parseInnerHandlerDefinition(element, parserContext); String ref = element.getAttribute(REF_ATTRIBUTE); - if (StringUtils.hasText(ref)) { + BeanDefinitionBuilder builder; + + if (innerHandlerDefinition != null || StringUtils.hasText(ref)){ builder = BeanDefinitionBuilder.genericBeanDefinition( IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator.MethodInvokingAggregator"); - builder.addConstructorArgReference(ref); - if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { - String method = element.getAttribute(METHOD_ATTRIBUTE); - builder.getRawBeanDefinition().getConstructorArgumentValues().addGenericArgumentValue(method, "java.lang.String"); - } - } - else { + } else { builder = BeanDefinitionBuilder.genericBeanDefinition( IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator.DefaultMessageAggregator"); } + + if (innerHandlerDefinition != null){ + builder.addConstructorArgValue(innerHandlerDefinition); + } else { + if (StringUtils.hasText(ref)) { + builder.addConstructorArgReference(ref); + } + } + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + String method = element.getAttribute(METHOD_ATTRIBUTE); + builder.getRawBeanDefinition().getConstructorArgumentValues().addGenericArgumentValue(method, "java.lang.String"); + } + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index d815694f17..e54c58409d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -967,7 +967,10 @@ - + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/InnerDefinitionHandlerAwareEndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/InnerDefinitionHandlerAwareEndpointParserTests.java index 324f91829d..a5af3caa8d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/InnerDefinitionHandlerAwareEndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/InnerDefinitionHandlerAwareEndpointParserTests.java @@ -15,8 +15,13 @@ */ package org.springframework.integration.config.xml; +import static org.junit.Assert.assertEquals; + import java.io.ByteArrayInputStream; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Properties; import junit.framework.Assert; @@ -31,7 +36,10 @@ import org.springframework.core.io.InputStreamResource; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.core.Message; +import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.core.MessageHeaders; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.MessageBuilder; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @@ -130,6 +138,32 @@ public class InnerDefinitionHandlerAwareEndpointParserTests { reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD); reader.loadBeanDefinitions(new InputStreamResource(stream)); } + + @Test + public void testInnerAggregatorDefinitionSuccess(){ + String configProperty = testConfigurations.getProperty("aggregator-inner-success"); + this.testAggregatorDefinitionSuccess(configProperty); + } + @Test + public void testInnerConcurrentAggregatorDefinitionSuccess(){ + String configProperty = testConfigurations.getProperty("aggregator-inner-concurrent-success"); + this.testAggregatorDefinitionSuccess(configProperty); + } + @Test + public void testRefAggregatorDefinitionSuccess(){ + String configProperty = testConfigurations.getProperty("aggregator-ref-success"); + this.testAggregatorDefinitionSuccess(configProperty); + } + @Test(expected=BeanDefinitionStoreException.class) + public void testInnerAggregatorDefinitionFailureRefAndInner(){ + String xmlConfig = testConfigurations.getProperty("aggregator-failure-refAndBean"); + ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes()); + GenericApplicationContext ac = new GenericApplicationContext(); + XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac); + reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD); + reader.loadBeanDefinitions(new InputStreamResource(stream)); + } + private void testSplitterDefinitionSuccess(String configProperty){ ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes()); GenericApplicationContext ac = new GenericApplicationContext(); @@ -199,6 +233,33 @@ public class InnerDefinitionHandlerAwareEndpointParserTests { PollableChannel channel1 = (PollableChannel) ac.getBean("outChannel"); Assert.assertTrue(channel1.receive().getPayload().equals("1")); } + private void testAggregatorDefinitionSuccess(String configProperty){ + ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes()); + GenericApplicationContext ac = new GenericApplicationContext(); + XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac); + reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD); + reader.loadBeanDefinitions(new InputStreamResource(stream)); + ac.refresh(); + ac.start(); + MessageChannel inChannel = (MessageChannel) ac.getBean("inChannel"); + for (int i = 0; i < 5; i++) { + Map headers = stubHeaders(i, 5, 1); + Message message = new GenericMessage(i, headers); + inChannel.send(message); + } + PollableChannel output = (PollableChannel) ac.getBean("outChannel"); + assertEquals(0 + 1 + 2 + 3 + 4, output.receive().getPayload()); + System.out.println(); + } + + private Map stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) { + Map headers = new HashMap(); + headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber); + headers.put(MessageHeaders.SEQUENCE_SIZE, sequenceSize); + headers.put(MessageHeaders.CORRELATION_ID, correllationId); + headers.put(MessageHeaders.ID, 1); + return headers; + } public static class TestSplitter{ public Collection split(String[] payload){ @@ -222,5 +283,15 @@ public class InnerDefinitionHandlerAwareEndpointParserTests { return value; } } + + public static class TestAggregator{ + public Integer sum(List numbers) { + int result = 0; + for (Integer number : numbers) { + result += number; + } + return result; + } + } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/innerdefaware.properties b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/innerdefaware.properties index a0229f8e74..88d1c7160a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/innerdefaware.properties +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/innerdefaware.properties @@ -192,4 +192,75 @@ sa-failure-refAndBean=\ \ \ \ + +aggregator-inner-success=\ + \ + \ + \ + \ + \ + \ + \ + \ + \ + +aggregator-ref-success=\ + \ + \ + \ + \ + \ + \ + \ + \ + \ + +aggregator-inner-concurrent-success=\ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ + +aggregator-failure-refAndBean=\ + \ + \ + \ + \ + \ + \ + \ + \ + \ + \ \ No newline at end of file