diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java
similarity index 60%
rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java
rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java
index 87a7d3c639..71eef17a21 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java
@@ -20,46 +20,30 @@ import java.util.List;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
-import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.util.Assert;
/**
- * An {@link AbstractMessageBarrierConsumer} that waits for a complete
- * group of {@link Message Messages} to arrive and then delegates to an
- * {@link Aggregator} to combine them into a single {@link Message}.
- *
- * The default strategy for determining whether a group is complete is based on
- * the 'sequenceSize' property of the header. Alternatively, a
+ * A base class for aggregating a group of Messages into a single Message.
+ * Extends {@link AbstractMessageBarrierConsumer} and waits for a
+ * complete group of {@link Message Messages} to arrive. Subclasses
+ * must provide the implementation of the {@link #aggregateMessages(List)}
+ * method to combine the group of Messages into a single {@link Message}.
+ *
+ *
The default strategy for determining whether a group is complete is based
+ * on the 'sequenceSize' property of the header. Alternatively, a
* custom implementation of the {@link CompletionStrategy} may be provided.
- *
- * All considerations regarding timeout and grouping by '
- * correlationId' from {@link AbstractMessageBarrierConsumer}
+ *
+ *
All considerations regarding timeout and grouping by
+ * correlationId from {@link AbstractMessageBarrierConsumer}
* apply here as well.
*
* @author Mark Fisher
* @author Marius Bogoevici
*/
-public class AggregatorEndpoint extends AbstractMessageBarrierConsumer {
-
- private final Aggregator aggregator;
+public abstract class AbstractMessageAggregator extends AbstractMessageBarrierConsumer {
private volatile CompletionStrategy completionStrategy = new SequenceSizeCompletionStrategy();
- /**
- * Create an endpoint that delegates to the provided Aggregator to combine a
- * group of messages into a single message. The executor will be used for
- * scheduling a background maintenance thread. If null, a new
- * single-threaded executor will be created.
- */
- public AggregatorEndpoint(Aggregator aggregator, TaskScheduler executor) {
- super();
- Assert.notNull(aggregator, "'aggregator' must not be null");
- this.aggregator = aggregator;
- }
-
- public AggregatorEndpoint(Aggregator aggregator) {
- this(aggregator, null);
- }
/**
* Strategy to determine whether the group of messages is complete.
@@ -78,7 +62,7 @@ public class AggregatorEndpoint extends AbstractMessageBarrierConsumer {
}
protected Message>[] processReleasedMessages(Object correlationId, List> messages) {
- Message> result = aggregator.aggregate(messages);
+ Message> result = this.aggregateMessages(messages);
if (result == null) {
return new Message>[0];
}
@@ -88,4 +72,6 @@ public class AggregatorEndpoint extends AbstractMessageBarrierConsumer {
return new Message>[] { result };
}
+ protected abstract Message> aggregateMessages(List> messages);
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java
deleted file mode 100644
index 7ab2db8ff5..0000000000
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.aggregator;
-
-import java.util.List;
-
-import org.springframework.integration.message.Message;
-
-/**
- * Strategy interface for aggregating a list of {@link Message Messages} into a
- * single {@link Message}.
- *
- * @author Mark Fisher
- */
-public interface Aggregator {
-
- Message> aggregate(List> messages);
-
-}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java
index 7f765b17d3..9ca3ef2504 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java
@@ -35,7 +35,7 @@ import org.springframework.util.ReflectionUtils;
*
* @author Marius Bogoevici
*/
-public abstract class MessageListMethodAdapter {
+public class MessageListMethodAdapter {
private final DefaultMethodInvoker invoker;
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
index f032fd12a3..289bb52f58 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
@@ -19,34 +19,51 @@ package org.springframework.integration.aggregator;
import java.lang.reflect.Method;
import java.util.List;
+import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
+import org.springframework.integration.util.DefaultMethodResolver;
+import org.springframework.integration.util.MethodResolver;
+import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
- * Aggregator adapter for methods annotated with {@link org.springframework.integration.annotation.Aggregator @Aggregator}
- * and for 'aggregator' elements that include a 'method' attribute
- * (e.g. <aggregator ref="beanReference" method="methodName"/>).
+ * {@link AbstractMessageAggregator} adapter for methods annotated with
+ * {@link Aggregator @Aggregator} annotation and for aggregator
+ * elements (e.g. <aggregator ref="beanReference" method="methodName"/>).
*
* @author Marius Bogoevici
* @author Mark Fisher
*/
-public class MethodInvokingAggregator extends MessageListMethodAdapter implements Aggregator {
+public class MethodInvokingAggregator extends AbstractMessageAggregator {
+
+ private final MethodResolver methodResolver = new DefaultMethodResolver(Aggregator.class);
+
+ private final MessageListMethodAdapter methodInvoker;
+
public MethodInvokingAggregator(Object object, Method method) {
- super(object, method);
+ this.methodInvoker = new MessageListMethodAdapter(object, method);
}
public MethodInvokingAggregator(Object object, String methodName) {
- super(object, methodName);
+ this.methodInvoker = new MessageListMethodAdapter(object, methodName);
+ }
+
+ public MethodInvokingAggregator(Object object) {
+ Assert.notNull(object, "object must not be null");
+ Method method = this.methodResolver.findMethod(object.getClass());
+ Assert.notNull(method, "unable to resolve Aggregator method on target class ["
+ + object.getClass() + "]");
+ this.methodInvoker = new MessageListMethodAdapter(object, method);
}
- public Message> aggregate(List> messages) {
+ public Message> aggregateMessages(List> messages) {
if (CollectionUtils.isEmpty(messages)) {
return null;
}
- Object returnedValue = this.executeMethod(messages);
+ Object returnedValue = this.methodInvoker.executeMethod(messages);
if (returnedValue == null) {
return null;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
index 9c817c735a..58bd328e72 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
@@ -22,7 +22,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.springframework.integration.aggregator.AggregatorEndpoint;
+import org.springframework.integration.aggregator.AbstractMessageAggregator;
/**
* Indicates that a method is capable of aggregating messages.
@@ -56,12 +56,12 @@ public @interface Aggregator {
/**
* timeout for sending results to the reply target (in milliseconds)
*/
- long sendTimeout() default AggregatorEndpoint.DEFAULT_SEND_TIMEOUT;
+ long sendTimeout() default AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT;
/**
* maximum time to wait for completion (in milliseconds)
*/
- long timeout() default AggregatorEndpoint.DEFAULT_TIMEOUT;
+ long timeout() default AbstractMessageAggregator.DEFAULT_TIMEOUT;
/**
* indicates whether to send an incomplete aggregate on timeout
@@ -71,13 +71,13 @@ public @interface Aggregator {
/**
* interval for the task that checks for timed-out aggregates
*/
- long reaperInterval() default AggregatorEndpoint.DEFAULT_REAPER_INTERVAL;
+ long reaperInterval() default AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL;
/**
* maximum number of correlation IDs to maintain so that received messages
* may be recognized as belonging to an aggregate that has already completed
* or timed out
*/
- int trackedCorrelationIdCapacity() default AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY;
+ int trackedCorrelationIdCapacity() default AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
index f9ec546bd7..2fc6461913 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
@@ -21,9 +21,9 @@ import org.w3c.dom.Element;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
-import org.springframework.integration.aggregator.AggregatorEndpoint;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.MethodInvokingAggregator;
+import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
@@ -56,8 +56,14 @@ public class AggregatorParser extends AbstractEndpointParser {
@Override
protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) {
- BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(AggregatorEndpoint.class);
- builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingAggregator.class));
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingAggregator.class);
+ String ref = element.getAttribute(REF_ATTRIBUTE);
+ Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required.");
+ builder.addConstructorArgReference(ref);
+ if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) {
+ String method = element.getAttribute(METHOD_ATTRIBUTE);
+ builder.addConstructorArgValue(method);
+ }
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE);
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE);
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
index 99c6c94102..36140b3b73 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
@@ -20,7 +20,7 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.springframework.core.annotation.AnnotationUtils;
-import org.springframework.integration.aggregator.AggregatorEndpoint;
+import org.springframework.integration.aggregator.AbstractMessageAggregator;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.annotation.Aggregator;
@@ -46,8 +46,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP
@Override
protected MessageConsumer createConsumer(Object bean, Method method, Aggregator annotation) {
- MethodInvokingAggregator adapter = new MethodInvokingAggregator(bean, method);
- AggregatorEndpoint aggregator = new AggregatorEndpoint(adapter);
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(bean, method);
this.configureCompletionStrategy(bean, aggregator);
String discardChannelName = annotation.discardChannel();
if (StringUtils.hasText(discardChannelName)) {
@@ -64,7 +63,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP
return aggregator;
}
- private void configureCompletionStrategy(final Object bean, final AggregatorEndpoint aggregator) {
+ private void configureCompletionStrategy(final Object bean, final AbstractMessageAggregator aggregator) {
ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() {
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
Annotation annotation = AnnotationUtils.getAnnotation(method, CompletionStrategy.class);
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
index 9db00b2cb4..85c6782a57 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
@@ -19,6 +19,7 @@ package org.springframework.integration.aggregator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -45,11 +47,12 @@ public class AggregatorEndpointTests {
private final TaskScheduler taskScheduler = Schedulers.createDefaultTaskScheduler(10);
- private AggregatorEndpoint aggregator;
+ private AbstractMessageAggregator aggregator;
+
@Before
public void configureAggregator() {
- this.aggregator = new AggregatorEndpoint(new TestAggregator());
+ this.aggregator = new TestAggregator();
this.aggregator.setTaskScheduler(this.taskScheduler);
this.taskScheduler.start();
this.aggregator.start();
@@ -211,27 +214,26 @@ public class AggregatorEndpointTests {
@Test
public void testNullReturningAggregator() throws InterruptedException {
- NullReturningAggregator nullReturningAggregator = new NullReturningAggregator();
- AggregatorEndpoint aggregator = new AggregatorEndpoint(nullReturningAggregator);
-// aggregator.setTaskScheduler(this.taskScheduler);
+ this.aggregator = new NullReturningAggregator();
+ this.aggregator.setTaskScheduler(this.taskScheduler);
QueueChannel replyChannel = new QueueChannel();
Message> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
- AggregatorTestTask task = new AggregatorTestTask(aggregator, message1, latch);
- this.taskScheduler.execute(task);
+ AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch);
+ this.taskScheduler.execute(task1);
AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch);
this.taskScheduler.execute(task2);
AggregatorTestTask task3 = new AggregatorTestTask(aggregator, message3, latch);
this.taskScheduler.execute(task3);
latch.await(1000, TimeUnit.MILLISECONDS);
- assertNull(task.getException());
+ assertNull(task1.getException());
assertNull(task2.getException());
assertNull(task3.getException());
Message> reply = replyChannel.receive(500);
assertNull(reply);
- assertEquals(true, nullReturningAggregator.isAggregationComplete());
+ assertTrue(((NullReturningAggregator) this.aggregator).isAggregationComplete());
}
@@ -247,9 +249,9 @@ public class AggregatorEndpointTests {
}
- private static class TestAggregator implements Aggregator {
+ private static class TestAggregator extends AbstractMessageAggregator {
- public Message> aggregate(List> messages) {
+ public Message> aggregateMessages(List> messages) {
List> sortableList = new ArrayList>(messages);
Collections.sort(sortableList, new MessageSequenceComparator());
StringBuffer buffer = new StringBuffer();
@@ -260,8 +262,9 @@ public class AggregatorEndpointTests {
}
}
-
- private static class NullReturningAggregator implements Aggregator {
+
+
+ private static class NullReturningAggregator extends AbstractMessageAggregator {
private boolean aggregationComplete;
@@ -271,7 +274,7 @@ public class AggregatorEndpointTests {
}
- public Message> aggregate(List> messages) {
+ public Message> aggregateMessages(List> messages) {
this.aggregationComplete = true;
return null;
}
@@ -281,7 +284,7 @@ public class AggregatorEndpointTests {
private static class AggregatorTestTask implements Runnable {
- private AggregatorEndpoint aggregator;
+ private AbstractMessageAggregator aggregator;
private Message> message;
@@ -290,7 +293,7 @@ public class AggregatorEndpointTests {
private CountDownLatch latch;
- AggregatorTestTask(AggregatorEndpoint aggregator, Message> message, CountDownLatch latch) {
+ AggregatorTestTask(AbstractMessageAggregator aggregator, Message> message, CountDownLatch latch) {
this.aggregator = aggregator;
this.message = message;
this.latch = latch;
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java
index 7506201a18..7e6fd41541 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java
@@ -45,63 +45,70 @@ public class MethodInvokingAggregatorTests {
@Test
public void adapterWithNonParameterizedMessageListBasedMethod() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
List> messages = createListOfMessages();
- Message> returnedMessge = aggregator.aggregate(messages);
+ Message> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithWildcardParameterizedMessageBasedMethod() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
List> messages = createListOfMessages();
- Message> returnedMessge = aggregator.aggregate(messages);
+ Message> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithTypeParameterizedMessageBasedMethod() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
List> messages = createListOfMessages();
- Message> returnedMessge = aggregator.aggregate(messages);
+ Message> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithPojoBasedMethod() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStrings");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationOnListOfStrings");
List> messages = createListOfMessages();
- Message> returnedMessge = aggregator.aggregate(messages);
+ Message> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithPojoBasedMethodReturningObject() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStringsReturningLong");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationOnListOfStringsReturningLong");
List> messages = createListOfMessages();
- Message> returnedMessge = aggregator.aggregate(messages);
+ Message> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals(123456789l, returnedMessge.getPayload());
}
@Test
public void adapterWithVoidReturnType() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNoReturn");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationWithNoReturn");
List> messages = createListOfMessages();
- Message> returnedMessage = aggregator.aggregate(messages);
+ Message> returnedMessage = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertNull(returnedMessage);
}
@Test
public void adapterWithNullReturn() {
- Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNullReturn");
+ MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
+ simpleAggregator, "doAggregationWithNullReturn");
List> messages = createListOfMessages();
- Message> returnedMessage = aggregator.aggregate(messages);
+ Message> returnedMessage = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertNull(returnedMessage);
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java
index 56253a0957..5f8d34d37f 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java
@@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
+import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
@@ -54,7 +55,7 @@ public class AggregatorParserTests {
@Test
public void testAggregation() {
MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput");
- TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean");
+ TestAggregatorBean aggregatorBean = (TestAggregatorBean) context.getBean("aggregatorBean");
List> outboundMessages = new ArrayList>();
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
@@ -73,14 +74,15 @@ public class AggregatorParserTests {
public void testPropertyAssignment() throws Exception {
SubscribingConsumerEndpoint endpoint =
(SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator");
- TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
- DirectFieldAccessor accessor = new DirectFieldAccessor(
- new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
- Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance",
- testAggregator, accessor.getPropertyValue("aggregator"));
+ Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
+ Assert.assertEquals(MethodInvokingAggregator.class, consumer.getClass());
+ DirectFieldAccessor accessor = new DirectFieldAccessor(consumer);
+ Method expectedMethod = TestAggregatorBean.class.getMethod("createSingleMessageFromGroup", List.class);
+ Assert.assertEquals("The MethodInvokingAggregator is not injected with the appropriate aggregation method",
+ expectedMethod, new DirectFieldAccessor(accessor.getPropertyValue("methodInvoker")).getPropertyValue("method"));
Assert.assertEquals(
"The AggregatorEndpoint is not injected with the appropriate CompletionStrategy instance",
completionStrategy, accessor.getPropertyValue("completionStrategy"));
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java
similarity index 90%
rename from org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java
rename to org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java
index e2839194e8..ecdd0ad44e 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java
@@ -22,19 +22,21 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.springframework.integration.aggregator.Aggregator;
import org.springframework.integration.aggregator.MessageSequenceComparator;
+import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Marius Bogoevici
*/
-public class TestAggregator implements Aggregator {
+public class TestAggregatorBean {
private final ConcurrentMap