Renamed the 'defaultReplyChannel' property to 'outputChannel' (and renamed the corresponding xml attributes) for the aggregator and resequencer.
This commit is contained in:
@@ -77,7 +77,7 @@ public class AggregatorParserTests {
|
||||
(AggregatingMessageHandler) endpoint.getHandler();
|
||||
TestAggregator testAggregator = (TestAggregator) context.getBean("aggregatorBean");
|
||||
CompletionStrategy completionStrategy = (CompletionStrategy) context.getBean("completionStrategy");
|
||||
MessageChannel defaultReplyChannel = (MessageChannel) context.getBean("replyChannel");
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
|
||||
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeAggregatingMessageHandler);
|
||||
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate Aggregator instance",
|
||||
@@ -85,8 +85,8 @@ public class AggregatorParserTests {
|
||||
Assert.assertEquals(
|
||||
"The AggregatingMessageHandler is not injected with the appropriate CompletionStrategy instance",
|
||||
completionStrategy, messageHandlerFieldAccessor.getPropertyValue("completionStrategy"));
|
||||
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate default reply channel",
|
||||
defaultReplyChannel, messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel"));
|
||||
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate output channel",
|
||||
outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertEquals("The AggregatingMessageHandler is not injected with the appropriate discard channel",
|
||||
discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals("The AggregatingMessageHandler is not set with the appropriate timeout value", 86420000l,
|
||||
@@ -114,8 +114,8 @@ public class AggregatorParserTests {
|
||||
for (Message<?> message : outboundMessages) {
|
||||
addingAggregator.handle(message);
|
||||
}
|
||||
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
|
||||
Message<?> response = replyChannel.receive();
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
Message<?> response = outputChannel.receive();
|
||||
Assert.assertEquals(6l, response.getPayload());
|
||||
}
|
||||
|
||||
@@ -144,11 +144,11 @@ public class AggregatorParserTests {
|
||||
aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null));
|
||||
aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null));
|
||||
aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null));
|
||||
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
|
||||
Message<?> reply = replyChannel.receive(0);
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
Message<?> reply = outputChannel.receive(0);
|
||||
Assert.assertNull(reply);
|
||||
aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null));
|
||||
reply = replyChannel.receive(0);
|
||||
reply = outputChannel.receive(0);
|
||||
Assert.assertNotNull(reply);
|
||||
Assert.assertEquals(11l, reply.getPayload());
|
||||
}
|
||||
@@ -160,12 +160,12 @@ public class AggregatorParserTests {
|
||||
|
||||
|
||||
private static <T> Message<T> createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber,
|
||||
MessageChannel replyChannel) {
|
||||
MessageChannel outputChannel) {
|
||||
return MessageBuilder.fromPayload(payload)
|
||||
.setCorrelationId(correlationId)
|
||||
.setSequenceSize(sequenceSize)
|
||||
.setSequenceNumber(sequenceNumber)
|
||||
.setReturnAddress(replyChannel).build();
|
||||
.setReturnAddress(outputChannel).build();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -49,17 +49,17 @@ public class ResequencerParserTests {
|
||||
public void testResequencing() {
|
||||
ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context
|
||||
.getBean("defaultResequencer");
|
||||
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
|
||||
outboundMessages.add(createMessage("123", "id1", 3, 3, replyChannel));
|
||||
outboundMessages.add(createMessage("789", "id1", 3, 1, replyChannel));
|
||||
outboundMessages.add(createMessage("456", "id1", 3, 2, replyChannel));
|
||||
outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel));
|
||||
outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel));
|
||||
outboundMessages.add(createMessage("456", "id1", 3, 2, outputChannel));
|
||||
for (Message<?> message : outboundMessages) {
|
||||
resequencingHandler.handle(message);
|
||||
}
|
||||
Message<?> message1 = replyChannel.receive(500);
|
||||
Message<?> message2 = replyChannel.receive(500);
|
||||
Message<?> message3 = replyChannel.receive(500);
|
||||
Message<?> message1 = outputChannel.receive(500);
|
||||
Message<?> message2 = outputChannel.receive(500);
|
||||
Message<?> message3 = outputChannel.receive(500);
|
||||
Assert.assertNotNull(message1);
|
||||
Assert.assertEquals(new Integer(1), message1.getHeaders().getSequenceNumber());
|
||||
Assert.assertNotNull(message2);
|
||||
@@ -73,7 +73,7 @@ public class ResequencerParserTests {
|
||||
ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context
|
||||
.getBean("defaultResequencer");
|
||||
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(resequencingHandler);
|
||||
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel"));
|
||||
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 1000l,
|
||||
messageHandlerFieldAccessor.getPropertyValue("sendTimeout"));
|
||||
@@ -95,11 +95,11 @@ public class ResequencerParserTests {
|
||||
public void testPropertyAssignment() throws Exception {
|
||||
ResequencingMessageHandler completeResequencingMessageHandler = (ResequencingMessageHandler) context
|
||||
.getBean("completelyDefinedResequencer");
|
||||
MessageChannel defaultReplyChannel = (MessageChannel) context.getBean("replyChannel");
|
||||
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
|
||||
MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel");
|
||||
DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeResequencingMessageHandler);
|
||||
Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate default reply channel",
|
||||
defaultReplyChannel, messageHandlerFieldAccessor.getPropertyValue("defaultReplyChannel"));
|
||||
Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate output channel",
|
||||
outputChannel, messageHandlerFieldAccessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate discard channel",
|
||||
discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 86420000l,
|
||||
@@ -119,12 +119,12 @@ public class ResequencerParserTests {
|
||||
}
|
||||
|
||||
private static <T> Message<T> createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber,
|
||||
MessageChannel replyChannel) {
|
||||
MessageChannel outputChannel) {
|
||||
return MessageBuilder.fromPayload(payload)
|
||||
.setCorrelationId(correlationId)
|
||||
.setSequenceSize(sequenceSize)
|
||||
.setSequenceNumber(sequenceNumber)
|
||||
.setReturnAddress(replyChannel)
|
||||
.setReturnAddress(outputChannel)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@@ -10,14 +10,14 @@
|
||||
<message-bus/>
|
||||
|
||||
<channel id="inputChannel"/>
|
||||
<channel id="replyChannel"/>
|
||||
<channel id="outputChannel"/>
|
||||
<channel id="discardChannel"/>
|
||||
|
||||
<aggregator id="aggregatorWithReference" ref="aggregatorBean" input-channel="inputChannel"/>
|
||||
|
||||
<aggregator id="completelyDefinedAggregator"
|
||||
input-channel="inputChannel"
|
||||
output-channel="replyChannel"
|
||||
output-channel="outputChannel"
|
||||
discard-channel="discardChannel"
|
||||
ref="aggregatorBean"
|
||||
completion-strategy="completionStrategy"
|
||||
@@ -31,11 +31,11 @@
|
||||
ref="adderBean"
|
||||
method="add"
|
||||
input-channel="inputChannel"
|
||||
output-channel="replyChannel"/>
|
||||
output-channel="outputChannel"/>
|
||||
|
||||
<aggregator id="aggregatorWithPojoCompletionStrategy"
|
||||
input-channel="inputChannel"
|
||||
output-channel="replyChannel"
|
||||
output-channel="outputChannel"
|
||||
ref="adderBean"
|
||||
method="add"
|
||||
completion-strategy="pojoCompletionStrategy"
|
||||
|
||||
@@ -50,7 +50,7 @@ public class AggregatorAnnotationTests {
|
||||
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
|
||||
endpointName);
|
||||
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
|
||||
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("defaultReplyChannel"));
|
||||
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("outputChannel"));
|
||||
Assert.assertNull(aggregatingMessageHandlerAccessor.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals(AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT, aggregatingMessageHandlerAccessor
|
||||
.getPropertyValue("sendTimeout"));
|
||||
@@ -71,8 +71,8 @@ public class AggregatorAnnotationTests {
|
||||
DirectFieldAccessor aggregatingMessageHandlerAccessor = getDirectFieldAccessorForAggregatingHandler(context,
|
||||
endpointName);
|
||||
Assert.assertTrue(aggregatingMessageHandlerAccessor.getPropertyValue("completionStrategy") instanceof SequenceSizeCompletionStrategy);
|
||||
Assert.assertEquals(getMessageBus(context).lookupChannel("replyChannel"), aggregatingMessageHandlerAccessor
|
||||
.getPropertyValue("defaultReplyChannel"));
|
||||
Assert.assertEquals(getMessageBus(context).lookupChannel("outputChannel"), aggregatingMessageHandlerAccessor
|
||||
.getPropertyValue("outputChannel"));
|
||||
Assert.assertEquals(getMessageBus(context).lookupChannel("discardChannel"), aggregatingMessageHandlerAccessor
|
||||
.getPropertyValue("discardChannel"));
|
||||
Assert.assertEquals(98765432l, aggregatingMessageHandlerAccessor
|
||||
|
||||
@@ -32,7 +32,7 @@ import org.springframework.stereotype.Component;
|
||||
/**
|
||||
* @author Marius Bogoevici
|
||||
*/
|
||||
@MessageEndpoint(input = "inputChannel", output= "replyChannel")
|
||||
@MessageEndpoint(input = "inputChannel", output = "outputChannel")
|
||||
@Component("endpointWithCustomizedAnnotation")
|
||||
public class TestAnnotatedEndpointWithCustomizedAggregator {
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
|
||||
<channel id="inputChannel"/>
|
||||
|
||||
<channel id="replyChannel"/>
|
||||
<channel id="outputChannel"/>
|
||||
|
||||
<channel id="discardChannel"/>
|
||||
|
||||
|
||||
@@ -9,14 +9,14 @@
|
||||
|
||||
<message-bus/>
|
||||
|
||||
<channel id="replyChannel" />
|
||||
<channel id="outputChannel" />
|
||||
|
||||
<channel id="discardChannel" />
|
||||
|
||||
<resequencer id="defaultResequencer"/>
|
||||
|
||||
<resequencer id="completelyDefinedResequencer"
|
||||
default-reply-channel="replyChannel"
|
||||
output-channel="outputChannel"
|
||||
discard-channel="discardChannel"
|
||||
send-timeout="86420000"
|
||||
send-partial-result-on-timeout="true"
|
||||
|
||||
Reference in New Issue
Block a user