From 05d9528024fc4c335c401d689b30e2cd2d5d0764 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Tue, 7 Oct 2008 03:19:28 +0000 Subject: [PATCH] AggregatorEndpoint has been replaced by AbstractMessageAggregator, and the Aggregator interface has been removed. The MethodInvokingAggregator is now capable of detecting a single method with the @Aggregator annotation if no "method" attribute is defined on an element, and it will fall back to detect a single public Method (else throw a IllegalArgumentException). --- ...nt.java => AbstractMessageAggregator.java} | 44 +++++++------------ .../integration/aggregator/Aggregator.java | 33 -------------- .../aggregator/MessageListMethodAdapter.java | 2 +- .../aggregator/MethodInvokingAggregator.java | 33 ++++++++++---- .../integration/annotation/Aggregator.java | 10 ++--- .../integration/config/AggregatorParser.java | 12 +++-- .../AggregatorAnnotationPostProcessor.java | 7 ++- .../aggregator/AggregatorEndpointTests.java | 35 ++++++++------- .../MethodInvokingAggregatorTests.java | 35 +++++++++------ .../config/AggregatorParserTests.java | 14 +++--- ...ggregator.java => TestAggregatorBean.java} | 8 ++-- .../config/aggregatorParserTests.xml | 2 +- .../annotation/AggregatorAnnotationTests.java | 10 ++--- 13 files changed, 117 insertions(+), 128 deletions(-) rename org.springframework.integration/src/main/java/org/springframework/integration/aggregator/{AggregatorEndpoint.java => AbstractMessageAggregator.java} (60%) delete mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java rename org.springframework.integration/src/test/java/org/springframework/integration/config/{TestAggregator.java => TestAggregatorBean.java} (90%) diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java similarity index 60% rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java index 87a7d3c639..71eef17a21 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageAggregator.java @@ -20,46 +20,30 @@ import java.util.List; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.util.Assert; /** - * An {@link AbstractMessageBarrierConsumer} that waits for a complete - * group of {@link Message Messages} to arrive and then delegates to an - * {@link Aggregator} to combine them into a single {@link Message}. - *

- * The default strategy for determining whether a group is complete is based on - * the 'sequenceSize' property of the header. Alternatively, a + * A base class for aggregating a group of Messages into a single Message. + * Extends {@link AbstractMessageBarrierConsumer} and waits for a + * complete group of {@link Message Messages} to arrive. Subclasses + * must provide the implementation of the {@link #aggregateMessages(List)} + * method to combine the group of Messages into a single {@link Message}. + * + *

The default strategy for determining whether a group is complete is based + * on the 'sequenceSize' property of the header. Alternatively, a * custom implementation of the {@link CompletionStrategy} may be provided. - *

- * All considerations regarding timeout and grouping by ' - * correlationId' from {@link AbstractMessageBarrierConsumer} + * + *

All considerations regarding timeout and grouping by + * correlationId from {@link AbstractMessageBarrierConsumer} * apply here as well. * * @author Mark Fisher * @author Marius Bogoevici */ -public class AggregatorEndpoint extends AbstractMessageBarrierConsumer { - - private final Aggregator aggregator; +public abstract class AbstractMessageAggregator extends AbstractMessageBarrierConsumer { private volatile CompletionStrategy completionStrategy = new SequenceSizeCompletionStrategy(); - /** - * Create an endpoint that delegates to the provided Aggregator to combine a - * group of messages into a single message. The executor will be used for - * scheduling a background maintenance thread. If null, a new - * single-threaded executor will be created. - */ - public AggregatorEndpoint(Aggregator aggregator, TaskScheduler executor) { - super(); - Assert.notNull(aggregator, "'aggregator' must not be null"); - this.aggregator = aggregator; - } - - public AggregatorEndpoint(Aggregator aggregator) { - this(aggregator, null); - } /** * Strategy to determine whether the group of messages is complete. @@ -78,7 +62,7 @@ public class AggregatorEndpoint extends AbstractMessageBarrierConsumer { } protected Message[] processReleasedMessages(Object correlationId, List> messages) { - Message result = aggregator.aggregate(messages); + Message result = this.aggregateMessages(messages); if (result == null) { return new Message[0]; } @@ -88,4 +72,6 @@ public class AggregatorEndpoint extends AbstractMessageBarrierConsumer { return new Message[] { result }; } + protected abstract Message aggregateMessages(List> messages); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java deleted file mode 100644 index 7ab2db8ff5..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/Aggregator.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.aggregator; - -import java.util.List; - -import org.springframework.integration.message.Message; - -/** - * Strategy interface for aggregating a list of {@link Message Messages} into a - * single {@link Message}. - * - * @author Mark Fisher - */ -public interface Aggregator { - - Message aggregate(List> messages); - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java index 7f765b17d3..9ca3ef2504 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MessageListMethodAdapter.java @@ -35,7 +35,7 @@ import org.springframework.util.ReflectionUtils; * * @author Marius Bogoevici */ -public abstract class MessageListMethodAdapter { +public class MessageListMethodAdapter { private final DefaultMethodInvoker invoker; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java index f032fd12a3..289bb52f58 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java @@ -19,34 +19,51 @@ package org.springframework.integration.aggregator; import java.lang.reflect.Method; import java.util.List; +import org.springframework.integration.annotation.Aggregator; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; +import org.springframework.integration.util.DefaultMethodResolver; +import org.springframework.integration.util.MethodResolver; +import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; /** - * Aggregator adapter for methods annotated with {@link org.springframework.integration.annotation.Aggregator @Aggregator} - * and for 'aggregator' elements that include a 'method' attribute - * (e.g. <aggregator ref="beanReference" method="methodName"/>). + * {@link AbstractMessageAggregator} adapter for methods annotated with + * {@link Aggregator @Aggregator} annotation and for aggregator + * elements (e.g. <aggregator ref="beanReference" method="methodName"/>). * * @author Marius Bogoevici * @author Mark Fisher */ -public class MethodInvokingAggregator extends MessageListMethodAdapter implements Aggregator { +public class MethodInvokingAggregator extends AbstractMessageAggregator { + + private final MethodResolver methodResolver = new DefaultMethodResolver(Aggregator.class); + + private final MessageListMethodAdapter methodInvoker; + public MethodInvokingAggregator(Object object, Method method) { - super(object, method); + this.methodInvoker = new MessageListMethodAdapter(object, method); } public MethodInvokingAggregator(Object object, String methodName) { - super(object, methodName); + this.methodInvoker = new MessageListMethodAdapter(object, methodName); + } + + public MethodInvokingAggregator(Object object) { + Assert.notNull(object, "object must not be null"); + Method method = this.methodResolver.findMethod(object.getClass()); + Assert.notNull(method, "unable to resolve Aggregator method on target class [" + + object.getClass() + "]"); + this.methodInvoker = new MessageListMethodAdapter(object, method); } - public Message aggregate(List> messages) { + public Message aggregateMessages(List> messages) { if (CollectionUtils.isEmpty(messages)) { return null; } - Object returnedValue = this.executeMethod(messages); + Object returnedValue = this.methodInvoker.executeMethod(messages); if (returnedValue == null) { return null; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java index 9c817c735a..58bd328e72 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java @@ -22,7 +22,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; -import org.springframework.integration.aggregator.AggregatorEndpoint; +import org.springframework.integration.aggregator.AbstractMessageAggregator; /** * Indicates that a method is capable of aggregating messages. @@ -56,12 +56,12 @@ public @interface Aggregator { /** * timeout for sending results to the reply target (in milliseconds) */ - long sendTimeout() default AggregatorEndpoint.DEFAULT_SEND_TIMEOUT; + long sendTimeout() default AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT; /** * maximum time to wait for completion (in milliseconds) */ - long timeout() default AggregatorEndpoint.DEFAULT_TIMEOUT; + long timeout() default AbstractMessageAggregator.DEFAULT_TIMEOUT; /** * indicates whether to send an incomplete aggregate on timeout @@ -71,13 +71,13 @@ public @interface Aggregator { /** * interval for the task that checks for timed-out aggregates */ - long reaperInterval() default AggregatorEndpoint.DEFAULT_REAPER_INTERVAL; + long reaperInterval() default AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL; /** * maximum number of correlation IDs to maintain so that received messages * may be recognized as belonging to an aggregate that has already completed * or timed out */ - int trackedCorrelationIdCapacity() default AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY; + int trackedCorrelationIdCapacity() default AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java index f9ec546bd7..2fc6461913 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java @@ -21,9 +21,9 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.integration.aggregator.AggregatorEndpoint; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.aggregator.MethodInvokingAggregator; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; /** @@ -56,8 +56,14 @@ public class AggregatorParser extends AbstractEndpointParser { @Override protected BeanDefinitionBuilder parseConsumer(Element element, ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(AggregatorEndpoint.class); - builder.addConstructorArgReference(this.parseAdapter(element, parserContext, MethodInvokingAggregator.class)); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MethodInvokingAggregator.class); + String ref = element.getAttribute(REF_ATTRIBUTE); + Assert.hasText(ref, "The '" + REF_ATTRIBUTE + "' attribute is required."); + builder.addConstructorArgReference(ref); + if (StringUtils.hasText(element.getAttribute(METHOD_ATTRIBUTE))) { + String method = element.getAttribute(METHOD_ATTRIBUTE); + builder.addConstructorArgValue(method); + } IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_TIMEOUT_ATTRIBUTE); IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, SEND_PARTIAL_RESULT_ON_TIMEOUT_ATTRIBUTE); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java index 99c6c94102..36140b3b73 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java @@ -20,7 +20,7 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import org.springframework.core.annotation.AnnotationUtils; -import org.springframework.integration.aggregator.AggregatorEndpoint; +import org.springframework.integration.aggregator.AbstractMessageAggregator; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.integration.annotation.Aggregator; @@ -46,8 +46,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP @Override protected MessageConsumer createConsumer(Object bean, Method method, Aggregator annotation) { - MethodInvokingAggregator adapter = new MethodInvokingAggregator(bean, method); - AggregatorEndpoint aggregator = new AggregatorEndpoint(adapter); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator(bean, method); this.configureCompletionStrategy(bean, aggregator); String discardChannelName = annotation.discardChannel(); if (StringUtils.hasText(discardChannelName)) { @@ -64,7 +63,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP return aggregator; } - private void configureCompletionStrategy(final Object bean, final AggregatorEndpoint aggregator) { + private void configureCompletionStrategy(final Object bean, final AbstractMessageAggregator aggregator) { ReflectionUtils.doWithMethods(bean.getClass(), new ReflectionUtils.MethodCallback() { public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { Annotation annotation = AnnotationUtils.getAnnotation(method, CompletionStrategy.class); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java index 9db00b2cb4..85c6782a57 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java @@ -19,6 +19,7 @@ package org.springframework.integration.aggregator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.Collections; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; import org.junit.Test; + import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -45,11 +47,12 @@ public class AggregatorEndpointTests { private final TaskScheduler taskScheduler = Schedulers.createDefaultTaskScheduler(10); - private AggregatorEndpoint aggregator; + private AbstractMessageAggregator aggregator; + @Before public void configureAggregator() { - this.aggregator = new AggregatorEndpoint(new TestAggregator()); + this.aggregator = new TestAggregator(); this.aggregator.setTaskScheduler(this.taskScheduler); this.taskScheduler.start(); this.aggregator.start(); @@ -211,27 +214,26 @@ public class AggregatorEndpointTests { @Test public void testNullReturningAggregator() throws InterruptedException { - NullReturningAggregator nullReturningAggregator = new NullReturningAggregator(); - AggregatorEndpoint aggregator = new AggregatorEndpoint(nullReturningAggregator); -// aggregator.setTaskScheduler(this.taskScheduler); + this.aggregator = new NullReturningAggregator(); + this.aggregator.setTaskScheduler(this.taskScheduler); QueueChannel replyChannel = new QueueChannel(); Message message1 = createMessage("123", "ABC", 3, 1, replyChannel); Message message2 = createMessage("456", "ABC", 3, 2, replyChannel); Message message3 = createMessage("789", "ABC", 3, 3, replyChannel); CountDownLatch latch = new CountDownLatch(3); - AggregatorTestTask task = new AggregatorTestTask(aggregator, message1, latch); - this.taskScheduler.execute(task); + AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch); + this.taskScheduler.execute(task1); AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch); this.taskScheduler.execute(task2); AggregatorTestTask task3 = new AggregatorTestTask(aggregator, message3, latch); this.taskScheduler.execute(task3); latch.await(1000, TimeUnit.MILLISECONDS); - assertNull(task.getException()); + assertNull(task1.getException()); assertNull(task2.getException()); assertNull(task3.getException()); Message reply = replyChannel.receive(500); assertNull(reply); - assertEquals(true, nullReturningAggregator.isAggregationComplete()); + assertTrue(((NullReturningAggregator) this.aggregator).isAggregationComplete()); } @@ -247,9 +249,9 @@ public class AggregatorEndpointTests { } - private static class TestAggregator implements Aggregator { + private static class TestAggregator extends AbstractMessageAggregator { - public Message aggregate(List> messages) { + public Message aggregateMessages(List> messages) { List> sortableList = new ArrayList>(messages); Collections.sort(sortableList, new MessageSequenceComparator()); StringBuffer buffer = new StringBuffer(); @@ -260,8 +262,9 @@ public class AggregatorEndpointTests { } } - - private static class NullReturningAggregator implements Aggregator { + + + private static class NullReturningAggregator extends AbstractMessageAggregator { private boolean aggregationComplete; @@ -271,7 +274,7 @@ public class AggregatorEndpointTests { } - public Message aggregate(List> messages) { + public Message aggregateMessages(List> messages) { this.aggregationComplete = true; return null; } @@ -281,7 +284,7 @@ public class AggregatorEndpointTests { private static class AggregatorTestTask implements Runnable { - private AggregatorEndpoint aggregator; + private AbstractMessageAggregator aggregator; private Message message; @@ -290,7 +293,7 @@ public class AggregatorEndpointTests { private CountDownLatch latch; - AggregatorTestTask(AggregatorEndpoint aggregator, Message message, CountDownLatch latch) { + AggregatorTestTask(AbstractMessageAggregator aggregator, Message message, CountDownLatch latch) { this.aggregator = aggregator; this.message = message; this.latch = latch; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java index 7506201a18..7e6fd41541 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/MethodInvokingAggregatorTests.java @@ -45,63 +45,70 @@ public class MethodInvokingAggregatorTests { @Test public void adapterWithNonParameterizedMessageListBasedMethod() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnNonParameterizedListOfMessages"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationOnNonParameterizedListOfMessages"); List> messages = createListOfMessages(); - Message returnedMessge = aggregator.aggregate(messages); + Message returnedMessge = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertEquals("123456789", returnedMessge.getPayload()); } @Test public void adapterWithWildcardParameterizedMessageBasedMethod() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard"); List> messages = createListOfMessages(); - Message returnedMessge = aggregator.aggregate(messages); + Message returnedMessge = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertEquals("123456789", returnedMessge.getPayload()); } @Test public void adapterWithTypeParameterizedMessageBasedMethod() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString"); List> messages = createListOfMessages(); - Message returnedMessge = aggregator.aggregate(messages); + Message returnedMessge = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertEquals("123456789", returnedMessge.getPayload()); } @Test public void adapterWithPojoBasedMethod() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStrings"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationOnListOfStrings"); List> messages = createListOfMessages(); - Message returnedMessge = aggregator.aggregate(messages); + Message returnedMessge = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertEquals("123456789", returnedMessge.getPayload()); } @Test public void adapterWithPojoBasedMethodReturningObject() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStringsReturningLong"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationOnListOfStringsReturningLong"); List> messages = createListOfMessages(); - Message returnedMessge = aggregator.aggregate(messages); + Message returnedMessge = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertEquals(123456789l, returnedMessge.getPayload()); } @Test public void adapterWithVoidReturnType() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNoReturn"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationWithNoReturn"); List> messages = createListOfMessages(); - Message returnedMessage = aggregator.aggregate(messages); + Message returnedMessage = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertNull(returnedMessage); } @Test public void adapterWithNullReturn() { - Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNullReturn"); + MethodInvokingAggregator aggregator = new MethodInvokingAggregator( + simpleAggregator, "doAggregationWithNullReturn"); List> messages = createListOfMessages(); - Message returnedMessage = aggregator.aggregate(messages); + Message returnedMessage = aggregator.aggregateMessages(messages); Assert.assertTrue(simpleAggregator.isAggregationPerformed()); Assert.assertNull(returnedMessage); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 56253a0957..5f8d34d37f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.aggregator.CompletionStrategy; import org.springframework.integration.aggregator.CompletionStrategyAdapter; +import org.springframework.integration.aggregator.MethodInvokingAggregator; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; @@ -54,7 +55,7 @@ public class AggregatorParserTests { @Test public void testAggregation() { MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput"); - TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean"); + TestAggregatorBean aggregatorBean = (TestAggregatorBean) context.getBean("aggregatorBean"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 1, null)); outboundMessages.add(createMessage("789", "id1", 3, 3, null)); @@ -73,14 +74,15 @@ public class AggregatorParserTests { public void testPropertyAssignment() throws Exception { SubscribingConsumerEndpoint endpoint = (SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator"); - TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean"); CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor accessor = new DirectFieldAccessor( - new DirectFieldAccessor(endpoint).getPropertyValue("consumer")); - Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance", - testAggregator, accessor.getPropertyValue("aggregator")); + Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("consumer"); + Assert.assertEquals(MethodInvokingAggregator.class, consumer.getClass()); + DirectFieldAccessor accessor = new DirectFieldAccessor(consumer); + Method expectedMethod = TestAggregatorBean.class.getMethod("createSingleMessageFromGroup", List.class); + Assert.assertEquals("The MethodInvokingAggregator is not injected with the appropriate aggregation method", + expectedMethod, new DirectFieldAccessor(accessor.getPropertyValue("methodInvoker")).getPropertyValue("method")); Assert.assertEquals( "The AggregatorEndpoint is not injected with the appropriate CompletionStrategy instance", completionStrategy, accessor.getPropertyValue("completionStrategy")); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java similarity index 90% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java rename to org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java index e2839194e8..ecdd0ad44e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregator.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAggregatorBean.java @@ -22,19 +22,21 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.springframework.integration.aggregator.Aggregator; import org.springframework.integration.aggregator.MessageSequenceComparator; +import org.springframework.integration.annotation.Aggregator; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; /** * @author Marius Bogoevici */ -public class TestAggregator implements Aggregator { +public class TestAggregatorBean { private final ConcurrentMap> aggregatedMessages = new ConcurrentHashMap>(); - public Message aggregate(List> messages) { + + @Aggregator + public Message createSingleMessageFromGroup(List> messages) { List> sortableList = new ArrayList>(messages); Collections.sort(sortableList, new MessageSequenceComparator()); StringBuffer buffer = new StringBuffer(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml index 973e9bc8d2..4cea807346 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/aggregatorParserTests.xml @@ -49,7 +49,7 @@ completion-strategy-method="checkCompleteness"/> + class="org.springframework.integration.config.TestAggregatorBean" /> diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index 4f340993fd..b1dea7b150 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -24,7 +24,7 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.AggregatorEndpoint; +import org.springframework.integration.aggregator.AbstractMessageAggregator; import org.springframework.integration.aggregator.CompletionStrategyAdapter; import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy; import org.springframework.integration.bus.MessageBus; @@ -47,11 +47,11 @@ public class AggregatorAnnotationTests { Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy); Assert.assertNull(accessor.getPropertyValue("outputChannel")); Assert.assertNull(accessor.getPropertyValue("discardChannel")); - Assert.assertEquals(AggregatorEndpoint.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout")); - Assert.assertEquals(AggregatorEndpoint.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout")); + Assert.assertEquals(AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout")); + Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout")); Assert.assertEquals(false, accessor.getPropertyValue("sendPartialResultOnTimeout")); - Assert.assertEquals(AggregatorEndpoint.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval")); - Assert.assertEquals(AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY, + Assert.assertEquals(AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval")); + Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY, accessor.getPropertyValue("trackedCorrelationIdCapacity")); }