diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencerEndpoint.java similarity index 92% rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencerEndpoint.java index 6e636024ea..74f50e9333 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencerEndpoint.java @@ -36,16 +36,16 @@ import org.springframework.integration.message.MessageHeaders; * * @author Marius Bogoevici */ -public class ResequencingMessageHandler extends AbstractMessageBarrierEndpoint { +public class ResequencerEndpoint extends AbstractMessageBarrierEndpoint { private volatile boolean releasePartialSequences = true; - public ResequencingMessageHandler() { + public ResequencerEndpoint() { this(null); } - public ResequencingMessageHandler(ScheduledExecutorService executor) { + public ResequencerEndpoint(ScheduledExecutorService executor) { super(executor); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java index b5351822d1..3d473b7022 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java @@ -20,7 +20,7 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; -import org.springframework.integration.aggregator.ResequencingMessageHandler; +import org.springframework.integration.aggregator.ResequencerEndpoint; /** * Parser for the <resequencer> element. @@ -38,7 +38,7 @@ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { @Override protected Class getBeanClass(Element element) { - return ResequencingMessageHandler.class; + return ResequencerEndpoint.class; } @Override diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java similarity index 94% rename from org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerMessageHandlerTests.java rename to org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java index d02bff9504..ff4bed5400 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerMessageHandlerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/ResequencerEndpointTests.java @@ -25,7 +25,7 @@ import static org.junit.Assert.assertNull; import org.junit.Test; -import org.springframework.integration.aggregator.ResequencingMessageHandler; +import org.springframework.integration.aggregator.ResequencerEndpoint; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -34,11 +34,11 @@ import org.springframework.integration.message.MessageBuilder; /** * @author Marius Bogoevici */ -public class ResequencerMessageHandlerTests { +public class ResequencerEndpointTests { @Test public void testBasicResequencing() throws InterruptedException { - ResequencingMessageHandler resequencer = new ResequencingMessageHandler(); + ResequencerEndpoint resequencer = new ResequencerEndpoint(); resequencer.setReleasePartialSequences(false); QueueChannel replyChannel = new QueueChannel(); Message message1 = createMessage("123", "ABC", 3, 3, replyChannel); @@ -62,7 +62,7 @@ public class ResequencerMessageHandlerTests { @Test public void testResequencingWithIncompleteSequenceRelease() throws InterruptedException { - ResequencingMessageHandler resequencer = new ResequencingMessageHandler(); + ResequencerEndpoint resequencer = new ResequencerEndpoint(); resequencer.setReleasePartialSequences(true); QueueChannel replyChannel = new QueueChannel(); Message message1 = createMessage("123", "ABC", 4, 2, replyChannel); @@ -98,7 +98,7 @@ public class ResequencerMessageHandlerTests { @Test public void testResequencingWithCompleteSequenceRelease() throws InterruptedException { - ResequencingMessageHandler resequencer = new ResequencingMessageHandler(); + ResequencerEndpoint resequencer = new ResequencerEndpoint(); resequencer.setReleasePartialSequences(false); QueueChannel replyChannel = new QueueChannel(); Message message1 = createMessage("123", "ABC", 4, 2, replyChannel); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 904433bfc0..87e6050799 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -26,7 +26,7 @@ import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.aggregator.ResequencingMessageHandler; +import org.springframework.integration.aggregator.ResequencerEndpoint; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; @@ -70,56 +70,54 @@ public class ResequencerParserTests { @Test public void testDefaultResequencerProperties() { - ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context - .getBean("defaultResequencer"); - DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(resequencingHandler); - Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("target")); - Assert.assertNull(messageHandlerFieldAccessor.getPropertyValue("discardChannel")); - Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 1000l, - messageHandlerFieldAccessor.getPropertyValue("sendTimeout")); + ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("defaultResequencer"); + DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + Assert.assertNull(accessor.getPropertyValue("target")); + Assert.assertNull(accessor.getPropertyValue("discardChannel")); + Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value", + 1000l, accessor.getPropertyValue("sendTimeout")); Assert.assertEquals( - "The ResequencingMessageHandler is not configured with the appropriate 'send partial results on timeout' flag", - false, messageHandlerFieldAccessor.getPropertyValue("sendPartialResultOnTimeout")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate reaper interval", - 1000l, messageHandlerFieldAccessor.getPropertyValue("reaperInterval")); + "The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag", + false, accessor.getPropertyValue("sendPartialResultOnTimeout")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval", + 1000l, accessor.getPropertyValue("reaperInterval")); Assert.assertEquals( - "The ResequencingMessageHandler is not configured with the appropriate tracked correlationId capacity", - 1000, messageHandlerFieldAccessor.getPropertyValue("trackedCorrelationIdCapacity")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate timeout", - 60000l, messageHandlerFieldAccessor.getPropertyValue("timeout")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate 'release partial sequences' flag", - true, messageHandlerFieldAccessor.getPropertyValue("releasePartialSequences")); + "The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity", + 1000, accessor.getPropertyValue("trackedCorrelationIdCapacity")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout", + 60000l, accessor.getPropertyValue("timeout")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag", + true, accessor.getPropertyValue("releasePartialSequences")); } @Test public void testPropertyAssignment() throws Exception { - ResequencingMessageHandler completeResequencingMessageHandler = (ResequencingMessageHandler) context - .getBean("completelyDefinedResequencer"); + ResequencerEndpoint endpoint = (ResequencerEndpoint) context.getBean("completelyDefinedResequencer"); MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); MessageChannel discardChannel = (MessageChannel) context.getBean("discardChannel"); - DirectFieldAccessor messageHandlerFieldAccessor = new DirectFieldAccessor(completeResequencingMessageHandler); - Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate output channel", - outputChannel, messageHandlerFieldAccessor.getPropertyValue("target")); - Assert.assertEquals("The ResequencingMessageHandler is not injected with the appropriate discard channel", - discardChannel, messageHandlerFieldAccessor.getPropertyValue("discardChannel")); - Assert.assertEquals("The ResequencingMessageHandler is not set with the appropriate timeout value", 86420000l, - messageHandlerFieldAccessor.getPropertyValue("sendTimeout")); + DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate output channel", + outputChannel, accessor.getPropertyValue("target")); + Assert.assertEquals("The ResequencerEndpoint is not injected with the appropriate discard channel", + discardChannel, accessor.getPropertyValue("discardChannel")); + Assert.assertEquals("The ResequencerEndpoint is not set with the appropriate timeout value", + 86420000l, accessor.getPropertyValue("sendTimeout")); Assert.assertEquals( - "The ResequencingMessageHandler is not configured with the appropriate 'send partial results on timeout' flag", - true, messageHandlerFieldAccessor.getPropertyValue("sendPartialResultOnTimeout")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate reaper interval", - 135l, messageHandlerFieldAccessor.getPropertyValue("reaperInterval")); + "The ResequencerEndpoint is not configured with the appropriate 'send partial results on timeout' flag", + true, accessor.getPropertyValue("sendPartialResultOnTimeout")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate reaper interval", + 135l, accessor.getPropertyValue("reaperInterval")); Assert.assertEquals( - "The ResequencingMessageHandler is not configured with the appropriate tracked correlationId capacity", - 99, messageHandlerFieldAccessor.getPropertyValue("trackedCorrelationIdCapacity")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate timeout", - 42l, messageHandlerFieldAccessor.getPropertyValue("timeout")); - Assert.assertEquals("The ResequencingMessageHandler is not configured with the appropriate 'release partial sequences' flag", - false, messageHandlerFieldAccessor.getPropertyValue("releasePartialSequences")); + "The ResequencerEndpoint is not configured with the appropriate tracked correlationId capacity", + 99, accessor.getPropertyValue("trackedCorrelationIdCapacity")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate timeout", + 42l, accessor.getPropertyValue("timeout")); + Assert.assertEquals("The ResequencerEndpoint is not configured with the appropriate 'release partial sequences' flag", + false, accessor.getPropertyValue("releasePartialSequences")); } - private static Message createMessage(T payload, Object correlationId, int sequenceSize, int sequenceNumber, - MessageChannel outputChannel) { + private static Message createMessage(T payload, Object correlationId, + int sequenceSize, int sequenceNumber, MessageChannel outputChannel) { return MessageBuilder.fromPayload(payload) .setCorrelationId(correlationId) .setSequenceSize(sequenceSize)