Aggregator and Resequencer handlers are now endpoint implementations instead of MessageHandler.

This commit is contained in:
Mark Fisher
2008-09-04 20:45:23 +00:00
parent ccf691c168
commit 3a5bfca9f9
15 changed files with 116 additions and 139 deletions

View File

@@ -33,7 +33,6 @@ import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.util.MethodInvoker;
@@ -54,16 +53,15 @@ public class AggregatorParserTests {
@Test
public void testAggregation() {
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) context.getBean("aggregatorWithReference");
AggregatingMessageHandler aggregatingHandler =
(AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
(AggregatingMessageHandler) context.getBean("aggregatorWithReference");
TestAggregator aggregatorBean = (TestAggregator) context.getBean("aggregatorBean");
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
outboundMessages.add(createMessage("123", "id1", 3, 1, null));
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
for (Message<?> message : outboundMessages) {
aggregatingHandler.handle(message);
aggregatingHandler.send(message);
}
Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean
.getAggregatedMessages().size());
@@ -74,9 +72,8 @@ public class AggregatorParserTests {
@Test
public void testPropertyAssignment() throws Exception {
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) context.getBean("completelyDefinedAggregator");
AggregatingMessageHandler completeAggregatingMessageHandler =
(AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
(AggregatingMessageHandler) context.getBean("completelyDefinedAggregator");
TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
@@ -88,7 +85,7 @@ public class AggregatorParserTests {
"The AggregatingMessageHandler is not injected with the appropriate CompletionStrategy instance",
completionStrategy, messageHandlerFieldAccessor.getPropertyValue("completionStrategy"));
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate output channel",
outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
outputChannel, messageHandlerFieldAccessor.getPropertyValue("target"));
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate discard channel",
discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The AggregatingMessageHandler is not set with the appropriate timeout value", 86420000l,
@@ -108,14 +105,13 @@ public class AggregatorParserTests {
@Test
public void testSimpleJavaBeanAggregator() {
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) context.getBean("aggregatorWithReferenceAndMethod");
AggregatingMessageHandler addingAggregator =
(AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
(AggregatingMessageHandler) context.getBean("aggregatorWithReferenceAndMethod");
outboundMessages.add(createMessage(1l, "id1", 3, 1, null));
outboundMessages.add(createMessage(2l, "id1", 3, 3, null));
outboundMessages.add(createMessage(3l, "id1", 3, 2, null));
for (Message<?> message : outboundMessages) {
addingAggregator.handle(message);
addingAggregator.send(message);
}
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> response = outputChannel.receive();
@@ -135,9 +131,8 @@ public class AggregatorParserTests {
@Test
public void testAggregatorWithPojoCompletionStrategy(){
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) context.getBean("aggregatorWithPojoCompletionStrategy");
AggregatingMessageHandler aggregatorWithPojoCompletionStrategy =
(AggregatingMessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
(AggregatingMessageHandler) context.getBean("aggregatorWithPojoCompletionStrategy");
CompletionStrategy completionStrategy = (CompletionStrategy)
new DirectFieldAccessor(aggregatorWithPojoCompletionStrategy).getPropertyValue("completionStrategy");
Assert.assertTrue(completionStrategy instanceof CompletionStrategyAdapter);
@@ -145,13 +140,13 @@ public class AggregatorParserTests {
MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker");
Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy);
Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness"));
aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(1l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(2l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(3l, "id1", 0 , 0, null));
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> reply = outputChannel.receive(0);
Assert.assertNull(reply);
aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(5l, "id1", 0 , 0, null));
reply = outputChannel.receive(0);
Assert.assertNotNull(reply);
Assert.assertEquals(11l, reply.getPayload());

View File

@@ -48,15 +48,14 @@ public class ResequencerParserTests {
@Test
public void testResequencing() {
ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context
.getBean("defaultResequencer");
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel));
outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel));
outboundMessages.add(createMessage("456", "id1", 3, 2, outputChannel));
for (Message<?> message : outboundMessages) {
resequencingHandler.handle(message);
inputChannel.send(message);
}
Message<?> message1 = outputChannel.receive(500);
Message<?> message2 = outputChannel.receive(500);
@@ -74,7 +73,7 @@ public class ResequencerParserTests {
ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context
.getBean("defaultResequencer");
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(resequencingHandler);
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("target"));
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 1000l,
messageHandlerFieldAccessor.getPropertyValue("sendTimeout"));
@@ -100,7 +99,7 @@ public class ResequencerParserTests {
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeResequencingMessageHandler);
Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate output channel",
outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
outputChannel, messageHandlerFieldAccessor.getPropertyValue("target"));
Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate discard channel",
discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 86420000l,

View File

@@ -21,9 +21,6 @@ import java.lang.reflect.Method;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.aop.support.DelegatingIntroductionInterceptor;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
@@ -32,8 +29,6 @@ import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.aggregator.SequenceSizeCompletionStrategy;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.config.MessageBusParser;
import org.springframework.integration.endpoint.DefaultEndpoint;
import org.springframework.integration.handler.MessageHandler;
/**
* @author Marius Bogoevici
@@ -49,7 +44,7 @@ public class AggregatorAnnotationTests {
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
endpointName);
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("outputChannel"));
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("target"));
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("discardChannel"));
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor
.getPropertyValue("sendTimeout"));
@@ -71,7 +66,7 @@ public class AggregatorAnnotationTests {
endpointName);
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), aggregatingMessageHandlerAccessor
.getPropertyValue("outputChannel"));
.getPropertyValue("target"));
Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), aggregatingMessageHandlerAccessor
.getPropertyValue("discardChannel"));
Assert.assertEquals(98765432l, aggregatingMessageHandlerAccessor
@@ -106,20 +101,8 @@ public class AggregatorAnnotationTests {
@SuppressWarnings("unchecked")
private DirectFieldAccessor getDirectFieldAccessorForAggregatingHandler(ApplicationContext context, final String endpointName) {
MessageBus messageBus = this.getMessageBus(context);
DefaultEndpoint<?> endpoint = (DefaultEndpoint<?>) messageBus.lookupEndpoint(endpointName + ".aggregator");
MessageHandler handler = (MessageHandler) new DirectFieldAccessor(endpoint).getPropertyValue("handler");
try {
if (AopUtils.isAopProxy(handler)) {
DelegatingIntroductionInterceptor interceptor = (DelegatingIntroductionInterceptor)
((Advised) handler).getAdvisors()[0].getAdvice();
Object delegate = new DirectFieldAccessor(interceptor).getPropertyValue("delegate");
return new DirectFieldAccessor(delegate);
}
}
catch (Exception e) {
// will return the accessor for the handler
}
return new DirectFieldAccessor(handler);
AggregatingMessageHandler endpoint = (AggregatingMessageHandler) messageBus.lookupEndpoint(endpointName + ".aggregator");
return new DirectFieldAccessor(endpoint);
}
private MessageBus getMessageBus(ApplicationContext context) {

View File

@@ -9,6 +9,8 @@
<message-bus/>
<channel id="inputChannel"/>
<channel id="outputChannel">
<queue capacity="5"/>
</channel>
@@ -17,9 +19,12 @@
<queue capacity="5"/>
</channel>
<resequencer id="defaultResequencer"/>
<resequencer id="defaultResequencer" input-channel="inputChannel"/>
<channel id="inputChannel2"/>
<resequencer id="completelyDefinedResequencer"
input-channel="inputChannel2"
output-channel="outputChannel"
discard-channel="discardChannel"
send-timeout="86420000"
@@ -29,4 +34,4 @@
timeout="42"
release-partial-sequences="false"/>
</beans:beans>
</beans:beans>