diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java
new file mode 100644
index 0000000000..199acef94a
--- /dev/null
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/DefaultMessageAggregator.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.aggregator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.integration.core.Message;
+import org.springframework.integration.message.MessageBuilder;
+
+/**
+ * The Default Message Aggregator implementation that combines a group of
+ * messages into a single message containing a {@link List} of all payloads. The
+ * elements of the List are in order of their receiving. Any MessageHeader value
+ * is ignored except the correlationId.
+ *
+ *
+ * n The default strategy for determining whether a group is complete is based
+ * on the 'sequenceSize' property of the header. Alternatively, a
+ * custom implementation of the {@link CompletionStrategy} may be provided.
+ *
+ *
+ * All considerations regarding timeout and grouping by
+ * correlationId from {@link AbstractMessageBarrierHandler} apply
+ * here as well.
+ *
+ *
+ * @author Alex Peters
+ *
+ */
+public class DefaultMessageAggregator extends AbstractMessageAggregator {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ protected Message> aggregateMessages(List> messages) {
+ List payloads = new ArrayList(messages.size());
+ for (Message> message : messages) {
+ payloads.add(message.getPayload());
+ }
+ return MessageBuilder.withPayload(payloads).build();
+ }
+
+}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java
index 7f19a66736..4586a37b88 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AggregatorParser.java
@@ -20,7 +20,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils;
-
import org.w3c.dom.Element;
/**
@@ -36,9 +35,9 @@ public class AggregatorParser extends AbstractConsumerEndpointParser {
private static final String COMPLETION_STRATEGY_METHOD_ATTRIBUTE = "completion-strategy-method";
- private static final String CORRELATION_STRATEGY_REF_ATTRIBUTE = "correlation-strategy";
+ private static final String CORRELATION_STRATEGY_REF_ATTRIBUTE = "correlation-strategy";
- private static final String CORRELATION_STRATEGY_METHOD_ATTRIBUTE = "correlation-strategy-method";
+ private static final String CORRELATION_STRATEGY_METHOD_ATTRIBUTE = "correlation-strategy-method";
private static final String DISCARD_CHANNEL_ATTRIBUTE = "discard-channel";
@@ -54,57 +53,74 @@ public class AggregatorParser extends AbstractConsumerEndpointParser {
private static final String COMPLETION_STRATEGY_PROPERTY = "completionStrategy";
- private static final String CORRELATION_STRATEGY_PROPERTY = "correlationStrategy";
-
+ private static final String CORRELATION_STRATEGY_PROPERTY = "correlationStrategy";
@Override
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
- BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
- IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator.MethodInvokingAggregator");
+ BeanDefinitionBuilder builder;
String ref = element.getAttribute(REF_ATTRIBUTE);
- if (!StringUtils.hasText(ref)) {
- parserContext.getReaderContext().error("The '" + REF_ATTRIBUTE + "' attribute is required.", element);
+ if (StringUtils.hasText(ref)) {
+ builder = BeanDefinitionBuilder
+ .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE
+ + ".aggregator.MethodInvokingAggregator");
+ builder.addConstructorArgReference(ref);
+ if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) {
+ String method = element.getAttribute(METHOD_ATTRIBUTE);
+ builder.addConstructorArgValue(method);
+ }
}
- builder.addConstructorArgReference(ref);
- if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) {
- String method = element.getAttribute(METHOD_ATTRIBUTE);
- builder.addConstructorArgValue(method);
+ else {
+ builder = BeanDefinitionBuilder
+ .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE
+ + ".aggregator.DefaultMessageAggregator");
}
- IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE);
- IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE);
- IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE);
- IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, REAPER_INTERVAL_ATTRIBUTE);
- IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE);
+ IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element,
+ DISCARD_CHANNEL_ATTRIBUTE);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
+ SEND_TIMEOUT_ATTRIBUTE);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
+ SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
+ REAPER_INTERVAL_ATTRIBUTE);
+ IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
+ TRACKED_CORRELATION_ID_CAPACITY_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, TIMEOUT_ATTRIBUTE);
- this.injectPropertyWithBean(COMPLETION_STRATEGY_REF_ATTRIBUTE, COMPLETION_STRATEGY_METHOD_ATTRIBUTE, COMPLETION_STRATEGY_PROPERTY,
- "CompletionStrategyAdapter", element, builder, parserContext);
- this.injectPropertyWithBean(CORRELATION_STRATEGY_REF_ATTRIBUTE, CORRELATION_STRATEGY_METHOD_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY,
- "CorrelationStrategyAdapter", element, builder, parserContext);
+ this.injectPropertyWithBean(COMPLETION_STRATEGY_REF_ATTRIBUTE,
+ COMPLETION_STRATEGY_METHOD_ATTRIBUTE, COMPLETION_STRATEGY_PROPERTY,
+ "CompletionStrategyAdapter", element, builder, parserContext);
+ this.injectPropertyWithBean(CORRELATION_STRATEGY_REF_ATTRIBUTE,
+ CORRELATION_STRATEGY_METHOD_ATTRIBUTE, CORRELATION_STRATEGY_PROPERTY,
+ "CorrelationStrategyAdapter", element, builder, parserContext);
return builder;
}
- private void injectPropertyWithBean(String beanRefAttribute, String methodRefAttribute, String beanProperty, String adapterClass,
- Element element, BeanDefinitionBuilder builder, ParserContext parserContext) {
- final String beanRef = element.getAttribute(beanRefAttribute);
- final String beanMethod = element.getAttribute(methodRefAttribute);
+ private void injectPropertyWithBean(String beanRefAttribute, String methodRefAttribute,
+ String beanProperty, String adapterClass, Element element,
+ BeanDefinitionBuilder builder, ParserContext parserContext) {
+ final String beanRef = element.getAttribute(beanRefAttribute);
+ final String beanMethod = element.getAttribute(methodRefAttribute);
if (StringUtils.hasText(beanRef)) {
if (StringUtils.hasText(beanMethod)) {
- String adapterBeanName = this.createAdapter(beanRef, beanMethod, adapterClass, parserContext);
+ String adapterBeanName = this.createAdapter(beanRef, beanMethod, adapterClass,
+ parserContext);
builder.addPropertyReference(beanProperty, adapterBeanName);
}
else {
builder.addPropertyReference(beanProperty, beanRef);
}
}
- }
+ }
- private String createAdapter(String ref, String method, String unqualifiedClassName, ParserContext parserContext) {
- BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
- IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator." + unqualifiedClassName);
+ private String createAdapter(String ref, String method, String unqualifiedClassName,
+ ParserContext parserContext) {
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder
+ .genericBeanDefinition(IntegrationNamespaceUtils.BASE_PACKAGE + ".aggregator."
+ + unqualifiedClassName);
builder.addConstructorArgReference(ref);
builder.addConstructorArgValue(method);
- return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
+ return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(),
+ parserContext.getRegistry());
}
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java
new file mode 100644
index 0000000000..45690b0573
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/DefaultMessageAggregatorTests.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.aggregator;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.awt.Button;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.springframework.integration.core.Message;
+
+/**
+ * @author Alex Peters
+ *
+ */
+public class DefaultMessageAggregatorTests {
+
+ DefaultMessageAggregator aggregator;
+
+ @Before
+ public void setUp() {
+ aggregator = new DefaultMessageAggregator();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void aggregateMessages_withMultiplePayloads_allAsListInResultMsg() {
+ List anyPayloads = Arrays.asList("foo", "bar", 123L, new Button());
+ List> messageGroup = new ArrayList>(anyPayloads.size());
+ for (Serializable payload : anyPayloads) {
+ Message mock = mock(Message.class);
+ when(mock.getPayload()).thenReturn(payload);
+ messageGroup.add(mock);
+ }
+ Message> result = aggregator.aggregateMessages(messageGroup);
+ assertThat((List) result.getPayload(), is(anyPayloads));
+ }
+}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml
index bbe2adf73a..b600373a94 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests-context.xml
@@ -6,22 +6,21 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
-
-
+
+
-
+
-
+
-
+
-
-
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java
index fff3b9c3e2..fa35e42fc7 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/ConcurrentAggregatorIntegrationTests.java
@@ -16,7 +16,9 @@
package org.springframework.integration.aggregator.integration;
-import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import java.util.HashMap;
import java.util.List;
@@ -24,9 +26,10 @@ import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.integration.aggregator.AbstractMessageAggregator;
+import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
@@ -36,6 +39,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* @author Iwein Fuld
+ * @author Alex Peters
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@@ -49,10 +53,12 @@ public class ConcurrentAggregatorIntegrationTests {
@Qualifier("output")
private PollableChannel output;
+ @Autowired
+ AbstractMessageAggregator aggregator;
@Test
public void configOk() throws Exception {
- // nothing to assert
+ assertThat(aggregator, is(MethodInvokingAggregator.class));
}
@Test(timeout = 1000)
@@ -61,11 +67,10 @@ public class ConcurrentAggregatorIntegrationTests {
Map headers = stubHeaders(i, 5, 1);
input.send(new GenericMessage(i, headers));
}
- assertEquals(0+1+2+3+4, output.receive().getPayload());
+ assertEquals(0 + 1 + 2 + 3 + 4, output.receive().getPayload());
}
-
- //configured in context associated with this test
+ // configured in context associated with this test
public static class SummingAggregator {
public Integer sum(List numbers) {
int result = 0;
@@ -76,7 +81,6 @@ public class ConcurrentAggregatorIntegrationTests {
}
}
-
private Map stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) {
Map headers = new HashMap();
headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml
new file mode 100644
index 0000000000..804cb231d4
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests-context.xml
@@ -0,0 +1,29 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java
new file mode 100644
index 0000000000..150818be81
--- /dev/null
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/integration/DefaultMessageAggregatorIntegrationTests.java
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.integration.aggregator.integration;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.integration.aggregator.AbstractMessageAggregator;
+import org.springframework.integration.aggregator.DefaultMessageAggregator;
+import org.springframework.integration.channel.PollableChannel;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.core.MessageHeaders;
+import org.springframework.integration.message.GenericMessage;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+/**
+ * @author Alex Peters
+ * @author Iwein Fuld
+ *
+ */
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration
+public class DefaultMessageAggregatorIntegrationTests {
+
+ @Autowired
+ @Qualifier("input")
+ MessageChannel input;
+
+ @Autowired
+ @Qualifier("output")
+ PollableChannel output;
+
+ @Autowired
+ AbstractMessageAggregator aggregator;
+
+ @Test
+ public void configOk() throws Exception {
+ assertThat(aggregator, is(DefaultMessageAggregator.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 1000)
+ public void aggregate() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ Map headers = stubHeaders(i, 5, 1);
+ input.send(new GenericMessage(i, headers));
+ }
+ Object payload = output.receive().getPayload();
+ assertThat(payload, is(List.class));
+ assertThat((List) payload, is(Arrays.asList(0, 1, 2, 3, 4)));
+ }
+
+ Map stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) {
+ Map headers = new HashMap();
+ headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
+ headers.put(MessageHeaders.SEQUENCE_SIZE, sequenceSize);
+ headers.put(MessageHeaders.CORRELATION_ID, correllationId);
+ return headers;
+ }
+
+}