diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java similarity index 96% rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java index 7e2e7ad2d2..581d81833b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java @@ -32,7 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractInOutEndpoint; -import org.springframework.integration.handler.MessageHandler; +import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageHandlingException; @@ -43,12 +43,12 @@ import org.springframework.util.ObjectUtils; /** * Base class for {@link MessageBarrier}-based MessageHandlers. - * A {@link MessageHandler} implementation that waits for a group of + * A {@link MessageEndpoint} implementation that waits for a group of * {@link Message Messages} to arrive and processes them together. * Uses a {@link MessageBarrier} to store messages and to decide how * the messages should be released. *
- * Each {@link Message} that is received by this handler will be associated with
+ * Each {@link Message} that is received by this endpoint will be associated with
* a group based upon the 'correlationId' property of its
* header. If no such property is available, a {@link MessageHandlingException}
* will be thrown.
@@ -62,7 +62,7 @@ import org.springframework.util.ObjectUtils;
* @author Mark Fisher
* @author Marius Bogoevici
*/
-public abstract class AbstractMessageBarrierHandler extends AbstractInOutEndpoint implements InitializingBean {
+public abstract class AbstractMessageBarrierEndpoint extends AbstractInOutEndpoint implements InitializingBean {
public final static long DEFAULT_SEND_TIMEOUT = 1000;
@@ -96,7 +96,7 @@ public abstract class AbstractMessageBarrierHandler extends AbstractInOutEndpoin
private volatile boolean initialized;
- public AbstractMessageBarrierHandler(ScheduledExecutorService executor) {
+ public AbstractMessageBarrierEndpoint(ScheduledExecutorService executor) {
this.executor = (executor != null) ? executor : Executors.newSingleThreadScheduledExecutor();
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java
similarity index 88%
rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java
rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java
index 1b706bdd5c..110242618e 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorEndpoint.java
@@ -25,7 +25,7 @@ import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
- * An {@link AbstractMessageBarrierHandler} that waits for a complete
+ * An {@link AbstractMessageBarrierEndpoint} that waits for a complete
* group of {@link Message Messages} to arrive and then delegates to an
* {@link Aggregator} to combine them into a single {@link Message}.
*
@@ -34,13 +34,13 @@ import org.springframework.util.CollectionUtils; * custom implementation of the {@link CompletionStrategy} may be provided. *
* All considerations regarding timeout and grouping by
- * 'correlationId' from {@link AbstractMessageBarrierHandler} apply
+ * 'correlationId' from {@link AbstractMessageBarrierEndpoint} apply
* here as well.
*
* @author Mark Fisher
* @author Marius Bogoevici
*/
-public class AggregatingMessageHandler extends AbstractMessageBarrierHandler {
+public class AggregatorEndpoint extends AbstractMessageBarrierEndpoint {
private final Aggregator aggregator;
@@ -48,18 +48,18 @@ public class AggregatingMessageHandler extends AbstractMessageBarrierHandler {
/**
- * Create a handler that delegates to the provided aggregator to combine a
+ * Create an endpoint that delegates to the provided Aggregator to combine a
* group of messages into a single message. The executor will be used for
* scheduling a background maintenance thread. If null, a new
* single-threaded executor will be created.
*/
- public AggregatingMessageHandler(Aggregator aggregator, ScheduledExecutorService executor) {
+ public AggregatorEndpoint(Aggregator aggregator, ScheduledExecutorService executor) {
super(executor);
Assert.notNull(aggregator, "'aggregator' must not be null");
this.aggregator = aggregator;
}
- public AggregatingMessageHandler(Aggregator aggregator) {
+ public AggregatorEndpoint(Aggregator aggregator) {
this(aggregator, null);
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
similarity index 87%
rename from org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorAdapter.java
rename to org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
index 5e5e43fe69..9c15dd211d 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AggregatorAdapter.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/MethodInvokingAggregator.java
@@ -31,13 +31,13 @@ import org.springframework.integration.message.Message;
* @author Marius Bogoevici
* @author Mark Fisher
*/
-public class AggregatorAdapter extends MessageListMethodAdapter implements Aggregator {
+public class MethodInvokingAggregator extends MessageListMethodAdapter implements Aggregator {
- public AggregatorAdapter(Object object, Method method) {
+ public MethodInvokingAggregator(Object object, Method method) {
super(object, method);
}
- public AggregatorAdapter(Object object, String methodName) {
+ public MethodInvokingAggregator(Object object, String methodName) {
super(object, methodName);
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java
index 5fc0250f9d..6e636024ea 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/ResequencingMessageHandler.java
@@ -23,7 +23,7 @@ import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageHeaders;
/**
- * An {@link AbstractMessageBarrierHandler} that waits for a group of
+ * An {@link AbstractMessageBarrierEndpoint} that waits for a group of
* {@link Message Messages} to arrive and re-sends them in order, sorted
* by their sequenceNumber.
*
@@ -31,12 +31,12 @@ import org.springframework.integration.message.MessageHeaders; * wait for the whole sequence to arrive before re-sending them. *
* All considerations regarding timeout and grouping by
- * 'correlationId' from {@link AbstractMessageBarrierHandler} apply
+ * 'correlationId' from {@link AbstractMessageBarrierEndpoint} apply
* here as well.
*
* @author Marius Bogoevici
*/
-public class ResequencingMessageHandler extends AbstractMessageBarrierHandler {
+public class ResequencingMessageHandler extends AbstractMessageBarrierEndpoint {
private volatile boolean releasePartialSequences = true;
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
index fde5a8fe47..9c817c735a 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/annotation/Aggregator.java
@@ -22,7 +22,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.springframework.integration.aggregator.AggregatingMessageHandler;
+import org.springframework.integration.aggregator.AggregatorEndpoint;
/**
* Indicates that a method is capable of aggregating messages.
@@ -56,12 +56,12 @@ public @interface Aggregator {
/**
* timeout for sending results to the reply target (in milliseconds)
*/
- long sendTimeout() default AggregatingMessageHandler.DEFAULT_SEND_TIMEOUT;
+ long sendTimeout() default AggregatorEndpoint.DEFAULT_SEND_TIMEOUT;
/**
* maximum time to wait for completion (in milliseconds)
*/
- long timeout() default AggregatingMessageHandler.DEFAULT_TIMEOUT;
+ long timeout() default AggregatorEndpoint.DEFAULT_TIMEOUT;
/**
* indicates whether to send an incomplete aggregate on timeout
@@ -71,13 +71,13 @@ public @interface Aggregator {
/**
* interval for the task that checks for timed-out aggregates
*/
- long reaperInterval() default AggregatingMessageHandler.DEFAULT_REAPER_INTERVAL;
+ long reaperInterval() default AggregatorEndpoint.DEFAULT_REAPER_INTERVAL;
/**
* maximum number of correlation IDs to maintain so that received messages
* may be recognized as belonging to an aggregate that has already completed
* or timed out
*/
- int trackedCorrelationIdCapacity() default AggregatingMessageHandler.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY;
+ int trackedCorrelationIdCapacity() default AggregatorEndpoint.DEFAULT_TRACKED_CORRRELATION_ID_CAPACITY;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
index e5db8827e3..ac9d530fd9 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AggregatorParser.java
@@ -21,8 +21,8 @@ import org.w3c.dom.Element;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
-import org.springframework.integration.aggregator.AggregatingMessageHandler;
-import org.springframework.integration.aggregator.AggregatorAdapter;
+import org.springframework.integration.aggregator.AggregatorEndpoint;
+import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.util.StringUtils;
@@ -59,12 +59,12 @@ public class AggregatorParser extends AbstractEndpointParser {
@Override
protected Class extends MessageEndpoint> getEndpointClass() {
- return AggregatingMessageHandler.class;
+ return AggregatorEndpoint.class;
}
@Override
protected Class> getMethodInvokingAdapterClass() {
- return AggregatorAdapter.class;
+ return MethodInvokingAggregator.class;
}
@Override
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
index 6c75a5ee1e..d805020d63 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AggregatorAnnotationPostProcessor.java
@@ -21,8 +21,8 @@ import java.lang.reflect.Method;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.aggregator.AggregatingMessageHandler;
-import org.springframework.integration.aggregator.AggregatorAdapter;
+import org.springframework.integration.aggregator.AggregatorEndpoint;
+import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.CompletionStrategy;
@@ -47,13 +47,13 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP
@Override
protected Object createMethodInvokingAdapter(Object bean, Method method, Aggregator annotation) {
- return new AggregatorAdapter(bean, method);
+ return new MethodInvokingAggregator(bean, method);
}
@Override
protected AbstractEndpoint createEndpoint(Object originalBean, Object adapter) {
if (adapter instanceof org.springframework.integration.aggregator.Aggregator) {
- AggregatingMessageHandler endpoint = new AggregatingMessageHandler((org.springframework.integration.aggregator.Aggregator) adapter);
+ AggregatorEndpoint endpoint = new AggregatorEndpoint((org.springframework.integration.aggregator.Aggregator) adapter);
this.configureCompletionStrategy(originalBean, endpoint);
return endpoint;
}
@@ -63,7 +63,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP
@Override
protected void configureEndpoint(AbstractEndpoint endpoint, Aggregator annotation, Poller pollerAnnotation) {
super.configureEndpoint(endpoint, annotation, pollerAnnotation);
- AggregatingMessageHandler aggregatorEndpoint = (AggregatingMessageHandler) endpoint;
+ AggregatorEndpoint aggregatorEndpoint = (AggregatorEndpoint) endpoint;
String discardChannelName = annotation.discardChannel();
if (StringUtils.hasText(discardChannelName)) {
MessageChannel discardChannel = this.getChannelRegistry().lookupChannel(discardChannelName);
@@ -80,7 +80,7 @@ public class AggregatorAnnotationPostProcessor extends AbstractMethodAnnotationP
aggregatorEndpoint.afterPropertiesSet();
}
- private void configureCompletionStrategy(final Object object, final AggregatingMessageHandler handler) {
+ private void configureCompletionStrategy(final Object object, final AggregatorEndpoint handler) {
ReflectionUtils.doWithMethods(object.getClass(), new ReflectionUtils.MethodCallback() {
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
Annotation annotation = AnnotationUtils.getAnnotation(method, CompletionStrategy.class);
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatingMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
similarity index 91%
rename from org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatingMessageHandlerTests.java
rename to org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
index e16b88703e..2a68a1077d 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatingMessageHandlerTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/AggregatorEndpointTests.java
@@ -39,12 +39,12 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* @author Mark Fisher
*/
-public class AggregatingMessageHandlerTests {
+public class AggregatorEndpointTests {
private final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- public AggregatingMessageHandlerTests() {
+ public AggregatorEndpointTests() {
this.executor.setMaxPoolSize(10);
this.executor.setQueueCapacity(0);
this.executor.afterPropertiesSet();
@@ -53,7 +53,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testCompleteGroupWithinTimeout() throws InterruptedException {
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
@@ -71,7 +71,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testShouldNotSendPartialResultOnTimeoutByDefault() throws InterruptedException {
QueueChannel discardChannel = new QueueChannel();
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(50);
aggregator.setReaperInterval(10);
aggregator.setDiscardChannel(discardChannel);
@@ -91,7 +91,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testShouldSendPartialResultOnTimeoutTrue() throws InterruptedException {
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTimeout(500);
aggregator.setReaperInterval(10);
aggregator.setSendPartialResultOnTimeout(true);
@@ -114,7 +114,7 @@ public class AggregatingMessageHandlerTests {
@Test
public void testMultipleGroupsSimultaneously() throws InterruptedException {
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel1 = new QueueChannel();
QueueChannel replyChannel2 = new QueueChannel();
Message> message1 = createMessage("123", "ABC", 3, 1, replyChannel1);
@@ -143,7 +143,7 @@ public class AggregatingMessageHandlerTests {
public void testDiscardChannelForTrackedCorrelationId() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
assertEquals("test-1a", replyChannel.receive(100).getPayload());
@@ -155,7 +155,7 @@ public class AggregatingMessageHandlerTests {
public void testTrackedCorrelationIdsCapacityAtLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
@@ -172,7 +172,7 @@ public class AggregatingMessageHandlerTests {
public void testTrackedCorrelationIdsCapacityPassesLimit() {
QueueChannel replyChannel = new QueueChannel();
QueueChannel discardChannel = new QueueChannel();
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
aggregator.setTrackedCorrelationIdCapacity(3);
aggregator.setDiscardChannel(discardChannel);
aggregator.handle(createMessage("test-1a", 1, 1, 1, replyChannel));
@@ -190,14 +190,14 @@ public class AggregatingMessageHandlerTests {
@Test(expected=MessageHandlingException.class)
public void testExceptionThrownIfNoCorrelationId() throws InterruptedException {
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
Message> message = createMessage("123", null, 2, 1, new QueueChannel());
aggregator.handle(message);
}
@Test
public void testAdditionalMessageAfterCompletion() throws InterruptedException {
- AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new TestAggregator());
+ AggregatorEndpoint aggregator = new AggregatorEndpoint(new TestAggregator());
QueueChannel replyChannel = new QueueChannel();
Message> message1 = createMessage("123", "ABC", 3, 1, replyChannel);
Message> message2 = createMessage("456", "ABC", 3, 2, replyChannel);
@@ -243,7 +243,7 @@ public class AggregatingMessageHandlerTests {
private static class AggregatorTestTask implements Runnable {
- private AggregatingMessageHandler aggregator;
+ private AggregatorEndpoint aggregator;
private Message> message;
@@ -252,7 +252,7 @@ public class AggregatingMessageHandlerTests {
private CountDownLatch latch;
- AggregatorTestTask(AggregatingMessageHandler aggregator, Message> message, CountDownLatch latch) {
+ AggregatorTestTask(AggregatorEndpoint aggregator, Message> message, CountDownLatch latch) {
this.aggregator = aggregator;
this.message = message;
this.latch = latch;
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/CompletionStrategyAdapterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/CompletionStrategyAdapterTests.java
index 0560613194..ee51b363d4 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/CompletionStrategyAdapterTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/aggregator/CompletionStrategyAdapterTests.java
@@ -25,7 +25,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.aggregator.AggregatorAdapter;
+import org.springframework.integration.aggregator.MethodInvokingAggregator;
import org.springframework.integration.aggregator.CompletionStrategy;
import org.springframework.integration.aggregator.CompletionStrategyAdapter;
import org.springframework.integration.message.GenericMessage;
@@ -130,7 +130,7 @@ public class CompletionStrategyAdapterTests {
@Test(expected = ConfigurationException.class)
public void testInvalidParameterTypeUsingMethodObject() throws SecurityException, NoSuchMethodException {
- new AggregatorAdapter(simpleCompletionStrategy, simpleCompletionStrategy.getClass().getMethod(
+ new MethodInvokingAggregator(simpleCompletionStrategy, simpleCompletionStrategy.getClass().getMethod(
"invalidParameterType", String.class));
}
@@ -160,19 +160,19 @@ public class CompletionStrategyAdapterTests {
@Test(expected = IllegalArgumentException.class)
public void testNullObject() {
- new AggregatorAdapter(null, "doesNotMatter");
+ new MethodInvokingAggregator(null, "doesNotMatter");
}
@Test(expected = IllegalArgumentException.class)
public void testNullMethodName() {
String methodName = null;
- new AggregatorAdapter(simpleCompletionStrategy, methodName);
+ new MethodInvokingAggregator(simpleCompletionStrategy, methodName);
}
@Test(expected = IllegalArgumentException.class)
public void testNullMethodObject() {
Method method = null;
- new AggregatorAdapter(simpleCompletionStrategy, method);
+ new MethodInvokingAggregator(simpleCompletionStrategy, method);
}
private static List