From 399eee83b10441dbb107e1bb7ea3e1ecf53a07fd Mon Sep 17 00:00:00 2001 From: Iwein Fuld Date: Wed, 20 May 2009 14:56:36 +0000 Subject: [PATCH] IN PROGRESS - issue INT-642: Add DefaultAggregator http://jira.springframework.org/browse/INT-642 Polished Alex' patch --- .../aggregator/DefaultMessageAggregator.java | 58 +++++++++++++ .../config/xml/AggregatorParser.java | 80 +++++++++++------- .../DefaultMessageAggregatorTests.java | 59 +++++++++++++ ...rentAggregatorIntegrationTests-context.xml | 17 ++-- .../ConcurrentAggregatorIntegrationTests.java | 18 ++-- ...sageAggregatorIntegrationTests-context.xml | 29 +++++++ ...aultMessageAggregatorIntegrationTests.java | 84 +++++++++++++++++++ 7 files changed, 297 insertions(+), 48 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java new file mode 100644 index 0000000000..199acef94a --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.aggregator; + +import java.util.ArrayList; +import java.util.List; + +import org.springframework.integration.core.Message; +import org.springframework.integration.message.MessageBuilder; + +/** + * The Default Message Aggregator implementation that combines a group of + * messages into a single message containing a {@link List} of all payloads. The + * elements of the List are in order of their receiving. Any MessageHeader value + * is ignored except the correlationId. + * + *

+ * n The default strategy for determining whether a group is complete is based + * on the 'sequenceSize' property of the header. Alternatively, a + * custom implementation of the {@link CompletionStrategy} may be provided. + *

+ *

+ * All considerations regarding timeout and grouping by + * correlationId from {@link AbstractMessageBarrierHandler} apply + * here as well. + *

+ * + * @author Alex Peters + * + */ +public class DefaultMessageAggregator extends AbstractMessageAggregator { + + /** + * {@inheritDoc} + */ + @Override + protected Message aggregateMessages(List> messages) { + List payloads = new ArrayList(messages.size()); + for (Message message : messages) { + payloads.add(message.getPayload()); + } + return MessageBuilder.withPayload(payloads).build(); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java index 7f19a66736..4586a37b88 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java @@ -20,7 +20,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.util.StringUtils; - import org.w3c.dom.Element; /** @@ -36,9 +35,9 @@ public class AggregatorParser extends AbstractConsumerEndpointParser { private static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method"; - private static final String CORRELATION_STRATEGY_REF_ATTRIBUTE = "correlation-strategy"; + private static final String CORRELATION_STRATEGY_REF_ATTRIBUTE = "correlation-strategy"; - private static final String CORRELATION_STRATEGY_METHOD_ATTRIBUTE = "correlation-strategy-method"; + private static final String CORRELATION_STRATEGY_METHOD_ATTRIBUTE = "correlation-strategy-method"; private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel"; @@ -54,57 +53,74 @@ public class AggregatorParser extends AbstractConsumerEndpointParser { private static final String COMPLETION_STRATEGY_PROPERTY = "completionStrategy"; - private static final String CORRELATION_STRATEGY_PROPERTY = "correlationStrategy"; - + private static final String CORRELATION_STRATEGY_PROPERTY = "correlationStrategy"; @Override protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( - IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator.MethodInvokingAggregator"); + BeanDefinitionBuilder builder; String ref = element.getAttribute(REF_ATTRIBUTE); - if (!StringUtils.hasText(ref)) { - parserContext.getReaderContext().error("The '" + REF_ATTRIBUTE + "' attribute is required.", element); + if (StringUtils.hasText(ref)) { + builder = BeanDefinitionBuilder + .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE + + ".aggregator.MethodInvokingAggregator"); + builder.addConstructorArgReference(ref); + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + String method = element.getAttribute(METHOD_ATTRIBUTE); + builder.addConstructorArgValue(method); + } } - builder.addConstructorArgReference(ref); - if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { - String method = element.getAttribute(METHOD_ATTRIBUTE); - builder.addConstructorArgValue(method); + else { + builder = BeanDefinitionBuilder + .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE + + ".aggregator.DefaultMessageAggregator"); } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, + DISCARD_CHANNEL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + SEND_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + REAPER_INTERVAL_ATTRIBUTE); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, + TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup"); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE); - this.injectPropertyWithBean(COMPLETION_STRATEGY_REF_ATTRIBUTE, COMPLETION_STRATEGY_METHOD_ATTRIBUTE, COMPLETION_STRATEGY_PROPERTY, - "CompletionStrategyAdapter", element, builder, parserContext); - this.injectPropertyWithBean(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY, - "CorrelationStrategyAdapter", element, builder, parserContext); + this.injectPropertyWithBean(COMPLETION_STRATEGY_REF_ATTRIBUTE, + COMPLETION_STRATEGY_METHOD_ATTRIBUTE, COMPLETION_STRATEGY_PROPERTY, + "CompletionStrategyAdapter", element, builder, parserContext); + this.injectPropertyWithBean(CORRELATION_STRATEGY_REF_ATTRIBUTE, + CORRELATION_STRATEGY_METHOD_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY, + "CorrelationStrategyAdapter", element, builder, parserContext); return builder; } - private void injectPropertyWithBean(String beanRefAttribute, String methodRefAttribute, String beanProperty, String adapterClass, - Element element, BeanDefinitionBuilder builder, ParserContext parserContext) { - final String beanRef = element.getAttribute(beanRefAttribute); - final String beanMethod = element.getAttribute(methodRefAttribute); + private void injectPropertyWithBean(String beanRefAttribute, String methodRefAttribute, + String beanProperty, String adapterClass, Element element, + BeanDefinitionBuilder builder, ParserContext parserContext) { + final String beanRef = element.getAttribute(beanRefAttribute); + final String beanMethod = element.getAttribute(methodRefAttribute); if (StringUtils.hasText(beanRef)) { if (StringUtils.hasText(beanMethod)) { - String adapterBeanName = this.createAdapter(beanRef, beanMethod, adapterClass, parserContext); + String adapterBeanName = this.createAdapter(beanRef, beanMethod, adapterClass, + parserContext); builder.addPropertyReference(beanProperty, adapterBeanName); } else { builder.addPropertyReference(beanProperty, beanRef); } } - } + } - private String createAdapter(String ref, String method, String unqualifiedClassName, ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( - IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator." + unqualifiedClassName); + private String createAdapter(String ref, String method, String unqualifiedClassName, + ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder + .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator." + + unqualifiedClassName); builder.addConstructorArgReference(ref); builder.addConstructorArgValue(method); - return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); + return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), + parserContext.getRegistry()); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java new file mode 100644 index 0000000000..45690b0573 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.aggregator; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.awt.Button; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.integration.core.Message; + +/** + * @author Alex Peters + * + */ +public class DefaultMessageAggregatorTests { + + DefaultMessageAggregator aggregator; + + @Before + public void setUp() { + aggregator = new DefaultMessageAggregator(); + } + + @SuppressWarnings("unchecked") + @Test + public void aggregateMessages_withMultiplePayloads_allAsListInResultMsg() { + List anyPayloads = Arrays.asList("foo", "bar", 123L, new Button()); + List> messageGroup = new ArrayList>(anyPayloads.size()); + for (Serializable payload : anyPayloads) { + Message mock = mock(Message.class); + when(mock.getPayload()).thenReturn(payload); + messageGroup.add(mock); + } + Message result = aggregator.aggregateMessages(messageGroup); + assertThat((List) result.getPayload(), is(anyPayloads)); + } +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml index bbe2adf73a..b600373a94 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml @@ -6,22 +6,21 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - - + + - + - + - + - - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java index fff3b9c3e2..fa35e42fc7 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java @@ -16,7 +16,9 @@ package org.springframework.integration.aggregator.integration; -import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import java.util.HashMap; import java.util.List; @@ -24,9 +26,10 @@ import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.aggregator.AbstractMessageAggregator; +import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessageHeaders; @@ -36,6 +39,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * @author Iwein Fuld + * @author Alex Peters */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration @@ -49,10 +53,12 @@ public class ConcurrentAggregatorIntegrationTests { @Qualifier("output") private PollableChannel output; + @Autowired + AbstractMessageAggregator aggregator; @Test public void configOk() throws Exception { - // nothing to assert + assertThat(aggregator, is(MethodInvokingAggregator.class)); } @Test(timeout = 1000) @@ -61,11 +67,10 @@ public class ConcurrentAggregatorIntegrationTests { Map headers = stubHeaders(i, 5, 1); input.send(new GenericMessage(i, headers)); } - assertEquals(0+1+2+3+4, output.receive().getPayload()); + assertEquals(0 + 1 + 2 + 3 + 4, output.receive().getPayload()); } - - //configured in context associated with this test + // configured in context associated with this test public static class SummingAggregator { public Integer sum(List numbers) { int result = 0; @@ -76,7 +81,6 @@ public class ConcurrentAggregatorIntegrationTests { } } - private Map stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) { Map headers = new HashMap(); headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml new file mode 100644 index 0000000000..804cb231d4 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml @@ -0,0 +1,29 @@ + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java new file mode 100644 index 0000000000..150818be81 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java @@ -0,0 +1,84 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.integration.aggregator.integration; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.aggregator.AbstractMessageAggregator; +import org.springframework.integration.aggregator.DefaultMessageAggregator; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.core.MessageHeaders; +import org.springframework.integration.message.GenericMessage; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Alex Peters + * @author Iwein Fuld + * + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration +public class DefaultMessageAggregatorIntegrationTests { + + @Autowired + @Qualifier("input") + MessageChannel input; + + @Autowired + @Qualifier("output") + PollableChannel output; + + @Autowired + AbstractMessageAggregator aggregator; + + @Test + public void configOk() throws Exception { + assertThat(aggregator, is(DefaultMessageAggregator.class)); + } + + @SuppressWarnings("unchecked") + @Test(timeout = 1000) + public void aggregate() throws Exception { + for (int i = 0; i < 5; i++) { + Map headers = stubHeaders(i, 5, 1); + input.send(new GenericMessage(i, headers)); + } + Object payload = output.receive().getPayload(); + assertThat(payload, is(List.class)); + assertThat((List) payload, is(Arrays.asList(0, 1, 2, 3, 4))); + } + + Map stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) { + Map headers = new HashMap(); + headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber); + headers.put(MessageHeaders.SEQUENCE_SIZE, sequenceSize); + headers.put(MessageHeaders.CORRELATION_ID, correllationId); + return headers; + } + +}