Renamed AggregatingMessageHandler to AggregatorEndpoint and AggregatorAdapter to MethodInvokingAggregator.

This commit is contained in:
Mark Fisher
2008-09-04 21:00:26 +00:00
parent 3a5bfca9f9
commit 84073697e9
12 changed files with 104 additions and 104 deletions

View File

@@ -39,12 +39,12 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author Mark Fisher
*/
public class AggregatingMessageHandlerTests {
public class AggregatorEndpointTests {
private final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
public AggregatingMessageHandlerTests() {
public AggregatorEndpointTests() {
this.executor.setMaxPoolSize(10);
this.executor.setQueueCapacity(0);
this.executor.afterPropertiesSet();
@@ -53,7 +53,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testCompleteGroupWithinTimeout() throws InterruptedException {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
@@ -71,7 +71,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testShouldNotSendPartialResultOnTimeoutByDefault() throws InterruptedException {
QueueChannel discardChannel = new QueueChannel();
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(50);
aggregator.setReaperInterval(10);
aggregator.setDiscardChannel(discardChannel);
@@ -91,7 +91,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedException {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(500);
aggregator.setReaperInterval(10);
aggregator.setSendPartialResultOnTimeout(true);
@@ -114,7 +114,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testMultipleGroupsSimultaneously() throws InterruptedException {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel1 = new QueueChannel();
QueueChannel replyChannel2 = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel1);
@@ -143,7 +143,7 @@ public class AggregatingMessageHandlerTests {
public void testDiscardChannelForTrackedCorrelationId() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
@@ -155,7 +155,7 @@ public class AggregatingMessageHandlerTests {
public void testTrackedCorrelationIdsCapacityAtLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
@@ -172,7 +172,7 @@ public class AggregatingMessageHandlerTests {
public void testTrackedCorrelationIdsCapacityPassesLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
@@ -190,14 +190,14 @@ public class AggregatingMessageHandlerTests {
@Test(expected=MessageHandlingException.class)
public void testExceptionThrownIfNoCorrelationId() throws InterruptedException {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
Message<?> message = createMessage("123", null, 2, 1, new QueueChannel());
aggregator.handle(message);
}
@Test
public void testAdditionalMessageAfterCompletion() throws InterruptedException {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
@@ -243,7 +243,7 @@ public class AggregatingMessageHandlerTests {
private static class AggregatorTestTask implements Runnable {
private AggregatingMessageHandler aggregator;
private AggregatorEndpoint aggregator;
private Message<?> message;
@@ -252,7 +252,7 @@ public class AggregatingMessageHandlerTests {
private CountDownLatch latch;
AggregatorTestTask(AggregatingMessageHandler aggregator, Message<?> message, CountDownLatch latch) {
AggregatorTestTask(AggregatorEndpoint aggregator, Message<?> message, CountDownLatch latch) {
this.aggregator = aggregator;
this.message = message;
this.latch = latch;

View File

@@ -25,7 +25,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.aggregator.AggregatorAdapter;
import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.message.GenericMessage;
@@ -130,7 +130,7 @@ public class CompletionStrategyAdapterTests {
@Test(expected = ConfigurationException.class)
public void testInvalidParameterTypeUsingMethodObject() throws SecurityException, NoSuchMethodException {
new AggregatorAdapter(simpleCompletionStrategy, simpleCompletionStrategy.getClass().getMethod(
new MethodInvokingAggregator(simpleCompletionStrategy, simpleCompletionStrategy.getClass().getMethod(
"invalidParameterType", String.class));
}
@@ -160,19 +160,19 @@ public class CompletionStrategyAdapterTests {
@Test(expected = IllegalArgumentException.class)
public void testNullObject() {
new AggregatorAdapter(null, "doesNotMatter");
new MethodInvokingAggregator(null, "doesNotMatter");
}
@Test(expected = IllegalArgumentException.class)
public void testNullMethodName() {
String methodName = null;
new AggregatorAdapter(simpleCompletionStrategy, methodName);
new MethodInvokingAggregator(simpleCompletionStrategy, methodName);
}
@Test(expected = IllegalArgumentException.class)
public void testNullMethodObject() {
Method method = null;
new AggregatorAdapter(simpleCompletionStrategy, method);
new MethodInvokingAggregator(simpleCompletionStrategy, method);
}
private static List<Message<?>> createListOfMessages() {

View File

@@ -26,7 +26,7 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.aggregator.Aggregator;
import org.springframework.integration.aggregator.AggregatorAdapter;
import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -34,7 +34,7 @@ import org.springframework.integration.message.Message;
* @author Marius Bogoevici
* @author Mark Fisher
*/
public class AggregatorAdapterTests {
public class MethodInvokingAggregatorTests {
private SimpleAggregator simpleAggregator;
@@ -46,7 +46,7 @@ public class AggregatorAdapterTests {
@Test
public void testAdapterWithNonParameterizedMessageListBasedMethod() {
Aggregator aggregator = new AggregatorAdapter(simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnNonParameterizedListOfMessages");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
@@ -55,7 +55,7 @@ public class AggregatorAdapterTests {
@Test
public void testAdapterWithWildcardParametrizedMessageBasedMethod() {
Aggregator aggregator = new AggregatorAdapter(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithWildcard");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
@@ -64,7 +64,7 @@ public class AggregatorAdapterTests {
@Test
public void testAdapterWithTypeParametrizedMessageBasedMethod() {
Aggregator aggregator = new AggregatorAdapter(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfMessagesParametrizedWithString");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
@@ -73,7 +73,7 @@ public class AggregatorAdapterTests {
@Test
public void testAdapterWithPojoBasedMethod() {
Aggregator aggregator = new AggregatorAdapter(simpleAggregator, "doAggregationOnListOfStrings");
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStrings");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
@@ -82,7 +82,7 @@ public class AggregatorAdapterTests {
@Test
public void testAdapterWithPojoBasedMethodReturningObject() {
Aggregator aggregator = new AggregatorAdapter(simpleAggregator, "doAggregationOnListOfStringsReturningLong");
Aggregator aggregator = new MethodInvokingAggregator(simpleAggregator, "doAggregationOnListOfStringsReturningLong");
List<Message<?>> messages = createListOfMessages();
Message<?> returnedMessge = aggregator.aggregate(messages);
Assert.assertTrue(simpleAggregator.isAggregationPerformed());
@@ -91,68 +91,68 @@ public class AggregatorAdapterTests {
@Test(expected=ConfigurationException.class)
public void testAdapterWithWrongMethodName() {
new AggregatorAdapter(simpleAggregator, "methodThatDoesNotExist");
new MethodInvokingAggregator(simpleAggregator, "methodThatDoesNotExist");
}
@Test(expected=ConfigurationException.class)
public void testInvalidParameterTypeUsingMethodName() {
new AggregatorAdapter(simpleAggregator, "invalidParameterType");
new MethodInvokingAggregator(simpleAggregator, "invalidParameterType");
}
@Test(expected=ConfigurationException.class)
public void testTooManyParametersUsingMethodName() {
new AggregatorAdapter(simpleAggregator, "tooManyParameters");
new MethodInvokingAggregator(simpleAggregator, "tooManyParameters");
}
@Test(expected=ConfigurationException.class)
public void testNotEnoughParametersUsingMethodName() {
new AggregatorAdapter(simpleAggregator, "notEnoughParameters");
new MethodInvokingAggregator(simpleAggregator, "notEnoughParameters");
}
@Test(expected=ConfigurationException.class)
public void testListSubclassParameterUsingMethodName() {
new AggregatorAdapter(simpleAggregator, "ListSubclassParameter");
new MethodInvokingAggregator(simpleAggregator, "ListSubclassParameter");
}
@Test(expected=ConfigurationException.class)
public void testInvalidParameterTypeUsingMethodObject() throws SecurityException, NoSuchMethodException {
new AggregatorAdapter(simpleAggregator, simpleAggregator.getClass().getMethod(
new MethodInvokingAggregator(simpleAggregator, simpleAggregator.getClass().getMethod(
"invalidParameterType", String.class));
}
@Test(expected=ConfigurationException.class)
public void testTooManyParametersUsingMethodObject() throws SecurityException, NoSuchMethodException {
new AggregatorAdapter(simpleAggregator, simpleAggregator.getClass().getMethod(
new MethodInvokingAggregator(simpleAggregator, simpleAggregator.getClass().getMethod(
"tooManyParameters", List.class, List.class));
}
@Test(expected=ConfigurationException.class)
public void testNotEnoughParametersUsingMethodObject() throws SecurityException, NoSuchMethodException {
new AggregatorAdapter(simpleAggregator, simpleAggregator.getClass().getMethod(
new MethodInvokingAggregator(simpleAggregator, simpleAggregator.getClass().getMethod(
"notEnoughParameters", new Class[] {} ));
}
@Test(expected= ConfigurationException.class)
public void testListSubclassParameterUsingMethodObject() throws SecurityException, NoSuchMethodException {
new AggregatorAdapter(simpleAggregator, simpleAggregator.getClass().getMethod(
new MethodInvokingAggregator(simpleAggregator, simpleAggregator.getClass().getMethod(
"listSubclassParameter", new Class[] {LinkedList.class} ));
}
@Test(expected=IllegalArgumentException.class)
public void testNullObject() {
new AggregatorAdapter(null, "doesNotMatter");
new MethodInvokingAggregator(null, "doesNotMatter");
}
@Test(expected=IllegalArgumentException.class)
public void testNullMethodName() {
String methodName = null;
new AggregatorAdapter(simpleAggregator, methodName);
new MethodInvokingAggregator(simpleAggregator, methodName);
}
@Test(expected=IllegalArgumentException.class)
public void testNullMethodObject() {
Method method = null;
new AggregatorAdapter(simpleAggregator, method);
new MethodInvokingAggregator(simpleAggregator, method);
}

View File

@@ -28,7 +28,7 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.AggregatorEndpoint;
import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.channel.MessageChannel;
@@ -53,15 +53,15 @@ public class AggregatorParserTests {
@Test
public void testAggregation() {
AggregatingMessageHandler aggregatingHandler =
(AggregatingMessageHandler) context.getBean("aggregatorWithReference");
AggregatorEndpoint endpoint =
(AggregatorEndpoint) context.getBean("aggregatorWithReference");
TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean");
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
for (Message<?> message : outboundMessages) {
aggregatingHandler.send(message);
endpoint.send(message);
}
Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean
.getAggregatedMessages().size());
@@ -72,41 +72,41 @@ public class AggregatorParserTests {
@Test
public void testPropertyAssignment() throws Exception {
AggregatingMessageHandler completeAggregatingMessageHandler =
(AggregatingMessageHandler) context.getBean("completelyDefinedAggregator");
AggregatorEndpoint endpoint =
(AggregatorEndpoint) context.getBean("completelyDefinedAggregator");
TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeAggregatingMessageHandler);
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate Aggregator instance",
testAggregator, messageHandlerFieldAccessor.getPropertyValue("aggregator"));
DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint);
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate Aggregator instance",
testAggregator, accessor.getPropertyValue("aggregator"));
Assert.assertEquals(
"The AggregatingMessageHandler is not injected with the appropriate CompletionStrategy instance",
completionStrategy, messageHandlerFieldAccessor.getPropertyValue("completionStrategy"));
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate output channel",
outputChannel, messageHandlerFieldAccessor.getPropertyValue("target"));
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate discard channel",
discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatingMessageHandler is not set with the appropriate timeout value", 86420000l,
messageHandlerFieldAccessor.getPropertyValue("sendTimeout"));
"The AggregatorEndpoint is not injected with the appropriate CompletionStrategy instance",
completionStrategy, accessor.getPropertyValue("completionStrategy"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate output channel",
outputChannel, accessor.getPropertyValue("target"));
Assert.assertEquals("The AggregatorEndpoint is not injected with the appropriate discard channel",
discardChannel, accessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatorEndpoint is not set with the appropriate timeout value",
86420000l, accessor.getPropertyValue("sendTimeout"));
Assert.assertEquals(
"The AggregatingMessageHandler is not configured with the appropriate 'send partial results on timeout' flag",
true, messageHandlerFieldAccessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals("The AggregatingMessageHandler is not configured with the appropriate reaper interval",
135l, messageHandlerFieldAccessor.getPropertyValue("reaperInterval"));
"The AggregatorEndpoint is not configured with the appropriate 'send partial results on timeout' flag",
true, accessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals("The AggregatorEndpoint is not configured with the appropriate reaper interval",
135l, accessor.getPropertyValue("reaperInterval"));
Assert.assertEquals(
"The AggregatingMessageHandler is not configured with the appropriate tracked correlationId capacity",
99, messageHandlerFieldAccessor.getPropertyValue("trackedCorrelationIdCapacity"));
Assert.assertEquals("The AggregatingMessageHandler is not configured with the appropriate timeout",
42l, messageHandlerFieldAccessor.getPropertyValue("timeout"));
"The AggregatorEndpoint is not configured with the appropriate tracked correlationId capacity",
99, accessor.getPropertyValue("trackedCorrelationIdCapacity"));
Assert.assertEquals("The AggregatorEndpoint is not configured with the appropriate timeout",
42l, accessor.getPropertyValue("timeout"));
}
@Test
public void testSimpleJavaBeanAggregator() {
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
AggregatingMessageHandler addingAggregator =
(AggregatingMessageHandler) context.getBean("aggregatorWithReferenceAndMethod");
AggregatorEndpoint addingAggregator =
(AggregatorEndpoint) context.getBean("aggregatorWithReferenceAndMethod");
outboundMessages.add(createMessage(1l, "id1", 3, 1, null));
outboundMessages.add(createMessage(2l, "id1", 3, 3, null));
outboundMessages.add(createMessage(3l, "id1", 3, 2, null));
@@ -131,8 +131,8 @@ public class AggregatorParserTests {
@Test
public void testAggregatorWithPojoCompletionStrategy(){
AggregatingMessageHandler aggregatorWithPojoCompletionStrategy =
(AggregatingMessageHandler) context.getBean("aggregatorWithPojoCompletionStrategy");
AggregatorEndpoint aggregatorWithPojoCompletionStrategy =
(AggregatorEndpoint) context.getBean("aggregatorWithPojoCompletionStrategy");
CompletionStrategy completionStrategy = (CompletionStrategy)
new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy");
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);

View File

@@ -24,7 +24,7 @@ import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.AggregatorEndpoint;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy;
import org.springframework.integration.bus.MessageBus;
@@ -46,14 +46,14 @@ public class AggregatorAnnotationTests {
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("target"));
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor
Assert.assertEquals(AggregatorEndpoint.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor
.getPropertyValue("sendTimeout"));
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_TIMEOUT, aggregatingMessageHandlerAccessor
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TIMEOUT, aggregatingMessageHandlerAccessor
.getPropertyValue("timeout"));
Assert.assertEquals(false, aggregatingMessageHandlerAccessor.getPropertyValue("sendPartialResultOnTimeout"));
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_REAPER_INTERVAL, aggregatingMessageHandlerAccessor
Assert.assertEquals(AggregatorEndpoint.DEFAULT_REAPER_INTERVAL, aggregatingMessageHandlerAccessor
.getPropertyValue("reaperInterval"));
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
Assert.assertEquals(AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY,
aggregatingMessageHandlerAccessor.getPropertyValue("trackedCorrelationIdCapacity"));
}
@@ -101,7 +101,7 @@ public class AggregatorAnnotationTests {
@SuppressWarnings("unchecked")
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
MessageBus messageBus = this.getMessageBus(context);
AggregatingMessageHandler endpoint = (AggregatingMessageHandler) messageBus.lookupEndpoint(endpointName + ".aggregator");
AggregatorEndpoint endpoint = (AggregatorEndpoint) messageBus.lookupEndpoint(endpointName + ".aggregator");
return new DirectFieldAccessor(endpoint);
}