AggregatorEndpoint has been replaced by AbstractMessageAggregator, and the Aggregator interface has been removed. The MethodInvokingAggregator is now capable of detecting a single method with the @Aggregator annotation if no "method" attribute is defined on an <aggregator/> element, and it will fall back to detect a single public Method (else throw a IllegalArgumentException).

This commit is contained in:
Mark Fisher
2008-10-07 03:19:28 +00:00
parent 7991ef889c
commit 05d9528024
13 changed files with 117 additions and 128 deletions

View File

@@ -19,6 +19,7 @@ package org.springframework.integration.aggregator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -45,11 +47,12 @@ public class AggregatorEndpointTests {
private final TaskScheduler taskScheduler = Schedulers.createDefaultTaskScheduler(10);
private AggregatorEndpoint aggregator;
private AbstractMessageAggregator aggregator;
@Before
public void configureAggregator() {
this.aggregator = new AggregatorEndpoint(new TestAggregator());
this.aggregator = new TestAggregator();
this.aggregator.setTaskScheduler(this.taskScheduler);
this.taskScheduler.start();
this.aggregator.start();
@@ -211,27 +214,26 @@ public class AggregatorEndpointTests {
@Test
public void testNullReturningAggregator() throws InterruptedException {
NullReturningAggregator nullReturningAggregator = new NullReturningAggregator();
AggregatorEndpoint aggregator = new AggregatorEndpoint(nullReturningAggregator);
// aggregator.setTaskScheduler(this.taskScheduler);
this.aggregator = new NullReturningAggregator();
this.aggregator.setTaskScheduler(this.taskScheduler);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
AggregatorTestTask task = new AggregatorTestTask(aggregator, message1, latch);
this.taskScheduler.execute(task);
AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch);
this.taskScheduler.execute(task1);
AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch);
this.taskScheduler.execute(task2);
AggregatorTestTask task3 = new AggregatorTestTask(aggregator, message3, latch);
this.taskScheduler.execute(task3);
latch.await(1000, TimeUnit.MILLISECONDS);
assertNull(task.getException());
assertNull(task1.getException());
assertNull(task2.getException());
assertNull(task3.getException());
Message<?> reply = replyChannel.receive(500);
assertNull(reply);
assertEquals(true, nullReturningAggregator.isAggregationComplete());
assertTrue(((NullReturningAggregator) this.aggregator).isAggregationComplete());
}
@@ -247,9 +249,9 @@ public class AggregatorEndpointTests {
}
private static class TestAggregator implements Aggregator {
private static class TestAggregator extends AbstractMessageAggregator {
public Message<?> aggregate(List<Message<?>> messages) {
public Message<?> aggregateMessages(List<Message<?>> messages) {
List<Message<?>> sortableList = new ArrayList<Message<?>>(messages);
Collections.sort(sortableList, new MessageSequenceComparator());
StringBuffer buffer = new StringBuffer();
@@ -260,8 +262,9 @@ public class AggregatorEndpointTests {
}
}
private static class NullReturningAggregator implements Aggregator {
private static class NullReturningAggregator extends AbstractMessageAggregator {
private boolean aggregationComplete;
@@ -271,7 +274,7 @@ public class AggregatorEndpointTests {
}
public Message<?> aggregate(List<Message<?>> messages) {
public Message<?> aggregateMessages(List<Message<?>> messages) {
this.aggregationComplete = true;
return null;
}
@@ -281,7 +284,7 @@ public class AggregatorEndpointTests {
private static class AggregatorTestTask implements Runnable {
private AggregatorEndpoint aggregator;
private AbstractMessageAggregator aggregator;
private Message<?> message;
@@ -290,7 +293,7 @@ public class AggregatorEndpointTests {
private CountDownLatch latch;
AggregatorTestTask(AggregatorEndpoint aggregator, Message<?> message, CountDownLatch latch) {
AggregatorTestTask(AbstractMessageAggregator aggregator, Message<?> message, CountDownLatch latch) {
this.aggregator = aggregator;
this.message = message;
this.latch = latch;

View File

@@ -45,63 +45,70 @@ public class MethodInvokingAggregatorTests {
@Test
public void adapterWithNonParameterizedMessageListBasedMethod() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Message<?> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithWildcardParameterizedMessageBasedMethod() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Message<?> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithTypeParameterizedMessageBasedMethod() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Message<?> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithPojoBasedMethod() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStrings");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationOnListOfStrings");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Message<?> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals("123456789", returnedMessge.getPayload());
}
@Test
public void adapterWithPojoBasedMethodReturningObject() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStringsReturningLong");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationOnListOfStringsReturningLong");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Message<?> returnedMessge = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertEquals(123456789l, returnedMessge.getPayload());
}
@Test
public void adapterWithVoidReturnType() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNoReturn");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationWithNoReturn");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessage = aggregator.aggregate(messages);
Message<?> returnedMessage = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertNull(returnedMessage);
}
@Test
public void adapterWithNullReturn() {
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationWithNullReturn");
MethodInvokingAggregator aggregator = new MethodInvokingAggregator(
simpleAggregator, "doAggregationWithNullReturn");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessage = aggregator.aggregate(messages);
Message<?> returnedMessage = aggregator.aggregateMessages(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
Assert.assertNull(returnedMessage);
}

View File

@@ -30,6 +30,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.SubscribingConsumerEndpoint;
@@ -54,7 +55,7 @@ public class AggregatorParserTests {
@Test
public void testAggregation() {
MessageChannel input = (MessageChannel) context.getBean("aggregatorWithReferenceInput");
TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean");
TestAggregatorBean aggregatorBean = (TestAggregatorBean) context.getBean("aggregatorBean");
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
@@ -73,14 +74,15 @@ public class AggregatorParserTests {
public void testPropertyAssignment() throws Exception {
SubscribingConsumerEndpoint endpoint =
(SubscribingConsumerEndpoint) context.getBean("completelyDefinedAggregator");
TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
DirectFieldAccessor accessor = new DirectFieldAccessor(
new DirectFieldAccessor(endpoint).getPropertyValue("consumer"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance",
testAggregator, accessor.getPropertyValue("aggregator"));
Object consumer = new DirectFieldAccessor(endpoint).getPropertyValue("consumer");
Assert.assertEquals(MethodInvokingAggregator.class, consumer.getClass());
DirectFieldAccessor accessor = new DirectFieldAccessor(consumer);
Method expectedMethod = TestAggregatorBean.class.getMethod("createSingleMessageFromGroup", List.class);
Assert.assertEquals("The MethodInvokingAggregator is not injected with the appropriate aggregation method",
expectedMethod, new DirectFieldAccessor(accessor.getPropertyValue("methodInvoker")).getPropertyValue("method"));
Assert.assertEquals(
"The AggregatorEndpoint is not injected with the appropriate CompletionStrategy instance",
completionStrategy, accessor.getPropertyValue("completionStrategy"));

View File

@@ -22,19 +22,21 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.integration.aggregator.Aggregator;
import org.springframework.integration.aggregator.MessageSequenceComparator;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Marius Bogoevici
*/
public class TestAggregator implements Aggregator {
public class TestAggregatorBean {
private final ConcurrentMap<Object, Message<?>> aggregatedMessages = new ConcurrentHashMap<Object, Message<?>>();
public Message<?> aggregate(List<Message<?>> messages) {
@Aggregator
public Message<?> createSingleMessageFromGroup(List<Message<?>> messages) {
List<Message<?>> sortableList = new ArrayList<Message<?>>(messages);
Collections.sort(sortableList, new MessageSequenceComparator());
StringBuffer buffer = new StringBuffer();

View File

@@ -49,7 +49,7 @@
completion-strategy-method="checkCompleteness"/>
<beans:bean id="aggregatorBean"
class="org.springframework.integration.config.TestAggregator" />
class="org.springframework.integration.config.TestAggregatorBean" />
<beans:bean id="adderBean"
class="org.springframework.integration.config.Adder" />

View File

@@ -24,7 +24,7 @@ import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.AggregatorEndpoint;
import org.springframework.integration.aggregator.AbstractMessageAggregator;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy;
import org.springframework.integration.bus.MessageBus;
@@ -47,11 +47,11 @@ public class AggregatorAnnotationTests {
Assert.assertTrue(accessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(accessor.getPropertyValue("outputChannel"));
Assert.assertNull(accessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_SEND_TIMEOUT, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TIMEOUT, accessor.getPropertyValue("timeout"));
Assert.assertEquals(false, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_REAPER_INTERVAL, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(AbstractMessageAggregator.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
accessor.getPropertyValue("trackedCorrelationIdCapacity"));
}