Complementary to the fixes made by INT-576. Since the ID of a message will be preserved by components that broadcast messages (e.g. a pub-sub channel), multiple messages in a correlation group may have the same ID. Therefore, organizing the storage support of MessageBarrier as a Map is obsolete. Switched to Collection. Improving the performance of the Aggregator.

This commit is contained in:
Marius Bogoevici
2009-02-13 19:53:08 +00:00
parent b6e697103f
commit 977ab1b34a
6 changed files with 116 additions and 146 deletions

View File

@@ -34,6 +34,7 @@ import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.StringMessage;
@@ -66,9 +67,9 @@ public class AggregatorEndpointTests {
@Test
public void testCompleteGroupWithinTimeout() throws InterruptedException {
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel, null);
CountDownLatch latch = new CountDownLatch(3);
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message2, latch));
@@ -80,34 +81,15 @@ public class AggregatorEndpointTests {
}
@Test
public void testCompleteGroupWithinTimeoutWithDuplicates() throws InterruptedException {
public void testCompleteGroupWithinTimeoutWithSameId() throws InterruptedException {
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel, "ID#1");
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel, "ID#1");
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel, "ID#1");
CountDownLatch latch = new CountDownLatch(3);
//for testing the duplication scenario, the messages must be processed synchronously
new AggregatorTestTask(this.aggregator, message1, latch).run();
new AggregatorTestTask(this.aggregator, message2, latch).run();
new AggregatorTestTask(this.aggregator, message2, latch).run();
new AggregatorTestTask(this.aggregator, message3, latch).run();
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
assertEquals("123456789", reply.getPayload());
}
@Test
public void testCompleteGroupWithinTimeoutWithInconsistentStructure() throws InterruptedException {
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message4 = createMessage("xyz", "ABC", 4, 3, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
//for testing the duplication scenario, the messages must be processed synchronously
new AggregatorTestTask(this.aggregator, message1, latch).run();
new AggregatorTestTask(this.aggregator, message2, latch).run();
new AggregatorTestTask(this.aggregator, message2, latch).run();
new AggregatorTestTask(this.aggregator, message3, latch).run();
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -121,7 +103,7 @@ public class AggregatorEndpointTests {
this.aggregator.setReaperInterval(10);
this.aggregator.setDiscardChannel(discardChannel);
QueueChannel replyChannel = new QueueChannel();
Message<?> message = createMessage("123", "ABC", 2, 1, replyChannel);
Message<?> message = createMessage("123", "ABC", 2, 1, replyChannel, null);
CountDownLatch latch = new CountDownLatch(1);
AggregatorTestTask task = new AggregatorTestTask(this.aggregator, message, latch);
this.taskExecutor.execute(task);
@@ -140,8 +122,8 @@ public class AggregatorEndpointTests {
this.aggregator.setReaperInterval(10);
this.aggregator.setSendPartialResultOnTimeout(true);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel, null);
CountDownLatch latch = new CountDownLatch(2);
AggregatorTestTask task1 = new AggregatorTestTask(this.aggregator, message1, latch);
AggregatorTestTask task2 = new AggregatorTestTask(this.aggregator, message2, latch);
@@ -160,12 +142,12 @@ public class AggregatorEndpointTests {
public void testMultipleGroupsSimultaneously() throws InterruptedException {
QueueChannel replyChannel1 = new QueueChannel();
QueueChannel replyChannel2 = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel1);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel1);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel1);
Message<?> message4 = createMessage("abc", "XYZ", 3, 1, replyChannel2);
Message<?> message5 = createMessage("def", "XYZ", 3, 2, replyChannel2);
Message<?> message6 = createMessage("ghi", "XYZ", 3, 3, replyChannel2);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel1, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel1, null);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel1, null);
Message<?> message4 = createMessage("abc", "XYZ", 3, 1, replyChannel2, null);
Message<?> message5 = createMessage("def", "XYZ", 3, 2, replyChannel2, null);
Message<?> message6 = createMessage("ghi", "XYZ", 3, 3, replyChannel2, null);
CountDownLatch latch = new CountDownLatch(6);
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message6, latch));
@@ -187,9 +169,9 @@ public class AggregatorEndpointTests {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel, null));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel, null));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -199,13 +181,13 @@ public class AggregatorEndpointTests {
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel, null));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel, null));
assertEquals("test-2", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel, null));
assertEquals("test-3", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel, null));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -215,32 +197,32 @@ public class AggregatorEndpointTests {
QueueChannel discardChannel = new QueueChannel();
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1a", 1, 1, 1, replyChannel, null));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-2", 2, 1, 1, replyChannel, null));
assertEquals("test-2", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-3", 3, 1, 1, replyChannel, null));
assertEquals("test-3", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-4", 4, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-4", 4, 1, 1, replyChannel, null));
assertEquals("test-4", replyChannel.receive(100).getPayload());
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handleMessage(createMessage("test-1b", 1, 1, 1, replyChannel, null));
assertEquals("test-1b", replyChannel.receive(100).getPayload());
assertNull(discardChannel.receive(0));
}
@Test(expected = MessageHandlingException.class)
public void testExceptionThrownIfNoCorrelationId() throws InterruptedException {
Message<?> message = createMessage("123", null, 2, 1, new QueueChannel());
Message<?> message = createMessage("123", null, 2, 1, new QueueChannel(), null);
this.aggregator.handleMessage(message);
}
@Test
public void testAdditionalMessageAfterCompletion() throws InterruptedException {
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message4 = createMessage("abc", "ABC", 3, 3, replyChannel);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel, null);
Message<?> message4 = createMessage("abc", "ABC", 3, 3, replyChannel, null);
CountDownLatch latch = new CountDownLatch(4);
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskExecutor.execute(new AggregatorTestTask(this.aggregator, message2, latch));
@@ -257,9 +239,9 @@ public class AggregatorEndpointTests {
this.aggregator = new NullReturningAggregator();
this.aggregator.setTaskScheduler(this.taskScheduler);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel, null);
CountDownLatch latch = new CountDownLatch(3);
AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch);
this.taskExecutor.execute(task1);
@@ -278,14 +260,16 @@ public class AggregatorEndpointTests {
private static Message<?> createMessage(String payload, Object correlationId,
int sequenceSize, int sequenceNumber, MessageChannel replyChannel) {
Message<String> message = MessageBuilder.withPayload(payload)
.setCorrelationId(correlationId)
.setSequenceSize(sequenceSize)
.setSequenceNumber(sequenceNumber)
.setReplyChannel(replyChannel)
.build();
return message;
int sequenceSize, int sequenceNumber, MessageChannel replyChannel, String predefinedId) {
MessageBuilder<String> builder = MessageBuilder.withPayload(payload)
.setCorrelationId(correlationId)
.setSequenceSize(sequenceSize)
.setSequenceNumber(sequenceNumber)
.setReplyChannel(replyChannel);
if (predefinedId != null) {
builder.setHeader(MessageHeaders.ID, predefinedId);
}
return builder.build();
}

View File

@@ -17,12 +17,16 @@
package org.springframework.integration.aggregator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.LinkedHashSet;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.core.Message;
/**
* @author Mark Fisher
@@ -32,17 +36,17 @@ public class MessageBarrierTests {
@Test
public void testMessageRetrieval() {
MessageBarrier barrier = new MessageBarrier(new LinkedHashMap(), null);
barrier.getMessages().put("1", new StringMessage("test1"));
MessageBarrier barrier = new MessageBarrier(new LinkedHashSet(), null);
barrier.getMessages().add(new StringMessage("test1"));
assertEquals(1, barrier.getMessages().size());
barrier.getMessages().put("2", new StringMessage("test2"));
barrier.getMessages().add(new StringMessage("test2"));
assertEquals(2, barrier.getMessages().size());
}
@Test
public void testTimestamp() {
long before = System.currentTimeMillis();
MessageBarrier barrier = new MessageBarrier(new LinkedHashMap(), null);
MessageBarrier barrier = new MessageBarrier(new LinkedHashSet(), null);
long timestamp = barrier.getTimestamp();
assertTrue(before <= timestamp);
long after = System.currentTimeMillis();
@@ -51,7 +55,7 @@ public class MessageBarrierTests {
@Test
public void testEmptyMessageList() {
MessageBarrier barrier = new MessageBarrier(new LinkedHashMap(), null);
MessageBarrier barrier = new MessageBarrier(new LinkedHashSet(), null);
assertEquals(0, barrier.getMessages().size());
}