Force AbstractMessageBarrierEndpoint and subclasses to use the TaskScheduler supplied via TaskSchedulerAware for scheduling the ReaperThread. Introducing a Schedulers helper class for creating a default TaskScheduler.

This commit is contained in:
Marius Bogoevici
2008-10-06 04:52:03 +00:00
parent 08e6db266b
commit 2d4bb80b10
8 changed files with 195 additions and 125 deletions

View File

@@ -26,42 +26,45 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.StringMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.integration.scheduling.Schedulers;
import org.springframework.integration.scheduling.TaskScheduler;
/**
* @author Mark Fisher
*/
public class AggregatorEndpointTests {
private final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
private final TaskScheduler taskScheduler = Schedulers.createDefaultTaskScheduler(10);
private AggregatorEndpoint aggregator;
public AggregatorEndpointTests() {
this.executor.setMaxPoolSize(10);
this.executor.setQueueCapacity(0);
this.executor.afterPropertiesSet();
@Before
public void configureAggregator() {
this.aggregator = new AggregatorEndpoint(new TestAggregator());
this.aggregator.setTaskScheduler(this.taskScheduler);
this.taskScheduler.start();
this.aggregator.onStart();
}
@Test
public void testCompleteGroupWithinTimeout() throws InterruptedException {
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
executor.execute(new AggregatorTestTask(aggregator, message1, latch));
executor.execute(new AggregatorTestTask(aggregator, message2, latch));
executor.execute(new AggregatorTestTask(aggregator, message3, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -71,38 +74,36 @@ public class AggregatorEndpointTests {
@Test
public void testShouldNotSendPartialResultOnTimeoutByDefault() throws InterruptedException {
QueueChannel discardChannel = new QueueChannel();
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(50);
aggregator.setReaperInterval(10);
aggregator.setDiscardChannel(discardChannel);
this.aggregator.setTimeout(50);
this.aggregator.setReaperInterval(10);
this.aggregator.setDiscardChannel(discardChannel);
QueueChannel replyChannel = new QueueChannel();
Message<?> message = createMessage("123", "ABC", 2, 1, replyChannel);
CountDownLatch latch = new CountDownLatch(1);
AggregatorTestTask task = new AggregatorTestTask(aggregator, message, latch);
executor.execute(task);
AggregatorTestTask task = new AggregatorTestTask(this.aggregator, message, latch);
this.taskScheduler.execute(task);
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals("task should have completed within timeout", 0, latch.getCount());
Message<?> reply = replyChannel.receive(0);
assertNull(reply);
Message<?> discardedMessage = discardChannel.receive(1000);
Message<?> discardedMessage = discardChannel.receive(2000);
assertNotNull(discardedMessage);
assertEquals(message, discardedMessage);
}
@Test
public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedException {
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(500);
aggregator.setReaperInterval(10);
aggregator.setSendPartialResultOnTimeout(true);
this.aggregator.setTimeout(500);
this.aggregator.setReaperInterval(10);
this.aggregator.setSendPartialResultOnTimeout(true);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
CountDownLatch latch = new CountDownLatch(2);
AggregatorTestTask task1 = new AggregatorTestTask(aggregator, message1, latch);
AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch);
executor.execute(task1);
executor.execute(task2);
AggregatorTestTask task1 = new AggregatorTestTask(this.aggregator, message1, latch);
AggregatorTestTask task2 = new AggregatorTestTask(this.aggregator, message2, latch);
this.taskScheduler.execute(task1);
this.taskScheduler.execute(task2);
latch.await(3000, TimeUnit.MILLISECONDS);
assertEquals("handlers should have been invoked within time limit", 0, latch.getCount());
Message<?> reply = replyChannel.receive(3000);
@@ -114,7 +115,6 @@ public class AggregatorEndpointTests {
@Test
public void testMultipleGroupsSimultaneously() throws InterruptedException {
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel1 = new QueueChannel();
QueueChannel replyChannel2 = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel1);
@@ -124,12 +124,12 @@ public class AggregatorEndpointTests {
Message<?> message5 = createMessage("def", "XYZ", 3, 2, replyChannel2);
Message<?> message6 = createMessage("ghi", "XYZ", 3, 3, replyChannel2);
CountDownLatch latch = new CountDownLatch(6);
executor.execute(new AggregatorTestTask(aggregator, message1, latch));
executor.execute(new AggregatorTestTask(aggregator, message6, latch));
executor.execute(new AggregatorTestTask(aggregator, message2, latch));
executor.execute(new AggregatorTestTask(aggregator, message5, latch));
executor.execute(new AggregatorTestTask(aggregator, message3, latch));
executor.execute(new AggregatorTestTask(aggregator, message4, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message6, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message5, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message4, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply1 = replyChannel1.receive(500);
assertNotNull(reply1);
@@ -143,11 +143,10 @@ public class AggregatorEndpointTests {
public void testDiscardChannelForTrackedCorrelationId() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -155,16 +154,15 @@ public class AggregatorEndpointTests {
public void testTrackedCorrelationIdsCapacityAtLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-2", 2, 1, 1, replyChannel));
assertEquals("test-2", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-3", 3, 1, 1, replyChannel));
assertEquals("test-3", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", discardChannel.receive(100).getPayload());
}
@@ -172,42 +170,39 @@ public class AggregatorEndpointTests {
public void testTrackedCorrelationIdsCapacityPassesLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
this.aggregator.setTrackedCorrelationIdCapacity(3);
this.aggregator.setDiscardChannel(discardChannel);
this.aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-2", 2, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-2", 2, 1, 1, replyChannel));
assertEquals("test-2", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-3", 3, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-3", 3, 1, 1, replyChannel));
assertEquals("test-3", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-4", 4, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-4", 4, 1, 1, replyChannel));
assertEquals("test-4", replyChannel.receive(100).getPayload());
aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
this.aggregator.handle(createMessage("test-1b", 1, 1, 1, replyChannel));
assertEquals("test-1b", replyChannel.receive(100).getPayload());
assertNull(discardChannel.receive(0));
}
@Test(expected=MessageHandlingException.class)
public void testExceptionThrownIfNoCorrelationId() throws InterruptedException {
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
Message<?> message = createMessage("123", null, 2, 1, new QueueChannel());
aggregator.handle(message);
this.aggregator.handle(message);
}
@Test
public void testAdditionalMessageAfterCompletion() throws InterruptedException {
AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
Message<?> message4 = createMessage("abc", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(4);
executor.execute(new AggregatorTestTask(aggregator, message1, latch));
executor.execute(new AggregatorTestTask(aggregator, message2, latch));
executor.execute(new AggregatorTestTask(aggregator, message3, latch));
executor.execute(new AggregatorTestTask(aggregator, message4, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message1, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message2, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message3, latch));
this.taskScheduler.execute(new AggregatorTestTask(this.aggregator, message4, latch));
latch.await(1000, TimeUnit.MILLISECONDS);
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -216,26 +211,27 @@ public class AggregatorEndpointTests {
@Test
public void testNullReturningAggregator() throws InterruptedException {
NullReturningAggregator aggregator = new NullReturningAggregator();
AggregatorEndpoint aggregatorEndpoint = new AggregatorEndpoint(aggregator);
NullReturningAggregator nullReturningAggregator = new NullReturningAggregator();
AggregatorEndpoint aggregator = new AggregatorEndpoint(nullReturningAggregator);
// aggregator.setTaskScheduler(this.taskScheduler);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 3, replyChannel);
CountDownLatch latch = new CountDownLatch(3);
AggregatorTestTask task = new AggregatorTestTask(aggregatorEndpoint, message1, latch);
executor.execute(task);
AggregatorTestTask task2 = new AggregatorTestTask(aggregatorEndpoint, message2, latch);
executor.execute(task2);
AggregatorTestTask task3 = new AggregatorTestTask(aggregatorEndpoint, message3, latch);
executor.execute(task3);
AggregatorTestTask task = new AggregatorTestTask(aggregator, message1, latch);
this.taskScheduler.execute(task);
AggregatorTestTask task2 = new AggregatorTestTask(aggregator, message2, latch);
this.taskScheduler.execute(task2);
AggregatorTestTask task3 = new AggregatorTestTask(aggregator, message3, latch);
this.taskScheduler.execute(task3);
latch.await(1000, TimeUnit.MILLISECONDS);
assertNull(task.getException());
assertNull(task2.getException());
assertNull(task3.getException());
Message<?> reply = replyChannel.receive(500);
assertNull(reply);
assertEquals(true, aggregator.isAggregationComplete());
assertEquals(true, nullReturningAggregator.isAggregationComplete());
}
@@ -325,5 +321,11 @@ public class AggregatorEndpointTests {
}
}
}
@After
public void stopTaskScheduler() {
this.taskScheduler.stop();
this.aggregator.onStop();
}
}

View File

@@ -20,30 +20,45 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.aggregator.ResequencerEndpoint;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.scheduling.Schedulers;
import org.springframework.integration.scheduling.TaskScheduler;
/**
* @author Marius Bogoevici
*/
public class ResequencerEndpointTests {
private ResequencerEndpoint resequencer;
private TaskScheduler taskScheduler;
@Before
public void configureResequencer() {
this.resequencer = new ResequencerEndpoint();
this.taskScheduler = Schedulers.createDefaultTaskScheduler(10);
this.resequencer.setTaskScheduler(taskScheduler);
taskScheduler.start();
this.resequencer.onStart();
}
@Test
public void testBasicResequencing() throws InterruptedException {
ResequencerEndpoint resequencer = new ResequencerEndpoint();
resequencer.setReleasePartialSequences(false);
this.resequencer.setReleasePartialSequences(false);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 3, 3, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 3, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 3, 2, replyChannel);
resequencer.handle(message1);
resequencer.handle(message3);
resequencer.handle(message2);
this.resequencer.handle(message1);
this.resequencer.handle(message3);
this.resequencer.handle(message2);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -57,16 +72,15 @@ public class ResequencerEndpointTests {
@Test
public void testResequencingWithIncompleteSequenceRelease() throws InterruptedException {
ResequencerEndpoint resequencer = new ResequencerEndpoint();
resequencer.setReleasePartialSequences(true);
this.resequencer.setReleasePartialSequences(true);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 4, 2, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 4, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 4, 4, replyChannel);
Message<?> message4 = createMessage("XYZ", "ABC", 4, 3, replyChannel);
resequencer.handle(message1);
resequencer.handle(message2);
resequencer.handle(message3);
this.resequencer.handle(message1);
this.resequencer.handle(message2);
this.resequencer.handle(message3);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -77,7 +91,7 @@ public class ResequencerEndpointTests {
assertEquals(new Integer(2), reply2.getHeaders().getSequenceNumber());
assertNull(reply3);
// when sending the last message, the whole sequence must have been sent
resequencer.handle(message4);
this.resequencer.handle(message4);
reply3 = replyChannel.receive(0);
Message<?> reply4 = replyChannel.receive(0);
assertNotNull(reply3);
@@ -89,16 +103,15 @@ public class ResequencerEndpointTests {
@Test
public void testResequencingWithCompleteSequenceRelease() throws InterruptedException {
ResequencerEndpoint resequencer = new ResequencerEndpoint();
resequencer.setReleasePartialSequences(false);
this.resequencer.setReleasePartialSequences(false);
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage("123", "ABC", 4, 2, replyChannel);
Message<?> message2 = createMessage("456", "ABC", 4, 1, replyChannel);
Message<?> message3 = createMessage("789", "ABC", 4, 4, replyChannel);
Message<?> message4 = createMessage("XYZ", "ABC", 4, 3, replyChannel);
resequencer.handle(message1);
resequencer.handle(message2);
resequencer.handle(message3);
this.resequencer.handle(message1);
this.resequencer.handle(message2);
this.resequencer.handle(message3);
Message<?> reply1 = replyChannel.receive(0);
Message<?> reply2 = replyChannel.receive(0);
Message<?> reply3 = replyChannel.receive(0);
@@ -107,7 +120,7 @@ public class ResequencerEndpointTests {
assertNull(reply2);
assertNull(reply3);
// after sending the last message, the whole sequence should have been sent
resequencer.handle(message4);
this.resequencer.handle(message4);
reply1 = replyChannel.receive(0);
reply2 = replyChannel.receive(0);
reply3 = replyChannel.receive(0);
@@ -134,4 +147,9 @@ public class ResequencerEndpointTests {
return message;
}
@After
public void stopTaskScheduler() {
this.resequencer.onStop();
this.taskScheduler.stop();
}
}