diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java index 98641cbe75..3df74bc060 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/AbstractRemotingOutboundGateway.java @@ -20,7 +20,7 @@ import java.io.Serializable; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageHandlingException; import org.springframework.remoting.RemoteAccessException; @@ -50,7 +50,8 @@ public abstract class AbstractRemotingOutboundGateway extends AbstractReplyProdu protected abstract MessageHandler createHandlerProxy(String url); - public final void handle(Message message, ReplyHolder replyHolder) { + @Override + public final void onMessage(Message message, ReplyMessageHolder replyHolder) { this.verifySerializability(message); try { Message reply = this.handlerProxy.handle(message); diff --git a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/AbstractWebServiceOutboundGateway.java b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/AbstractWebServiceOutboundGateway.java index c635846c07..7fb766ad6c 100644 --- a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/AbstractWebServiceOutboundGateway.java +++ b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/AbstractWebServiceOutboundGateway.java @@ -21,7 +21,7 @@ import java.net.URI; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.util.Assert; import org.springframework.ws.WebServiceMessage; @@ -85,7 +85,7 @@ public abstract class AbstractWebServiceOutboundGateway extends AbstractReplyPro } @Override - public final void handle(Message message, ReplyHolder replyHolder) { + public final void onMessage(Message message, ReplyMessageHolder replyHolder) { Object responsePayload = this.doHandle(message.getPayload(), this.getRequestCallback(message)); if (responsePayload != null) { replyHolder.set(responsePayload); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierConsumer.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierConsumer.java index 261b005f73..88d2f7340f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierConsumer.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierConsumer.java @@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageHandlingException; @@ -167,7 +167,7 @@ public abstract class AbstractMessageBarrierConsumer extends AbstractReplyProduc } @Override - protected final void handle(Message message, ReplyHolder replyHolder) { + protected final void onMessage(Message message, ReplyMessageHolder replyHolder) { if (!this.initialized) { this.afterPropertiesSet(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractReplyProducingMessageConsumer.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractReplyProducingMessageConsumer.java index 6d10b07bb1..ab5f7b92ab 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractReplyProducingMessageConsumer.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractReplyProducingMessageConsumer.java @@ -97,16 +97,16 @@ public abstract class AbstractReplyProducingMessageConsumer extends AbstractMess if (!this.supports(message)) { throw new MessageRejectedException(message, "unsupported message"); } - ReplyHolder replyHolder = new ReplyHolder(); - this.handle(message, replyHolder); - if (replyHolder.isEmpty()) { + ReplyMessageHolder replyMessageHolder = new ReplyMessageHolder(); + this.onMessage(message, replyMessageHolder); + if (replyMessageHolder.isEmpty()) { if (this.requiresReply) { throw new MessageHandlingException(message, "consumer '" + this + "' requires a reply, but no reply was received"); } return; } - Object targetChannelValue = replyHolder.getTargetChannel(); + Object targetChannelValue = replyMessageHolder.getTargetChannel(); MessageChannel replyChannel = null; if (targetChannelValue == null) { replyChannel = this.resolveReplyChannel(message); @@ -115,13 +115,13 @@ public abstract class AbstractReplyProducingMessageConsumer extends AbstractMess replyChannel = this.channelResolver.resolveChannelName((String) targetChannelValue); } MessageHeaders requestHeaders = message.getHeaders(); - for (MessageBuilder builder : replyHolder.builders()) { + for (MessageBuilder builder : replyMessageHolder.builders()) { builder.copyHeadersIfAbsent(requestHeaders); this.sendReplyMessage(builder.build(), replyChannel); } } - protected abstract void handle(Message message, ReplyHolder replyHolder); + protected abstract void onMessage(Message requestMessage, ReplyMessageHolder replyMessageHolder); protected boolean supports(Message message) { if (this.selector != null && !this.selector.accept(message)) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyHolder.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyMessageHolder.java similarity index 73% rename from org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyHolder.java rename to org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyMessageHolder.java index 41aaf54d67..c84af169b1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyHolder.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ReplyMessageHolder.java @@ -27,19 +27,19 @@ import org.springframework.integration.message.MessageBuilder; /** * @author Mark Fisher */ -public class ReplyHolder { +public class ReplyMessageHolder { private final List> builders = new ArrayList>(); private volatile Object targetChannel; - public MessageBuilder set(Object replyObject) { - return this.createAndAddBuilder(replyObject, true); + public MessageBuilder set(Object messageOrPayload) { + return this.createAndAddBuilder(messageOrPayload, true); } - public MessageBuilder add(Object replyObject) { - return this.createAndAddBuilder(replyObject, false); + public MessageBuilder add(Object messageOrPayload) { + return this.createAndAddBuilder(messageOrPayload, false); } public void setTargetChannel(MessageChannel targetChannel) { @@ -62,16 +62,16 @@ public class ReplyHolder { return Collections.unmodifiableList(this.builders); } - private MessageBuilder createAndAddBuilder(Object replyObject, boolean clearExistingValues) { + private MessageBuilder createAndAddBuilder(Object messageOrPayload, boolean clearExistingValues) { MessageBuilder builder = null; - if (replyObject instanceof MessageBuilder) { - builder = (MessageBuilder) replyObject; + if (messageOrPayload instanceof MessageBuilder) { + builder = (MessageBuilder) messageOrPayload; } - else if (replyObject instanceof Message) { - builder = MessageBuilder.fromMessage((Message) replyObject); + else if (messageOrPayload instanceof Message) { + builder = MessageBuilder.fromMessage((Message) messageOrPayload); } else { - builder = MessageBuilder.withPayload(replyObject); + builder = MessageBuilder.withPayload(messageOrPayload); } synchronized (this.builders) { if (clearExistingValues) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java index d7f19c30e8..0f01982e08 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java @@ -63,7 +63,7 @@ public class ServiceActivatorEndpoint extends AbstractReplyProducingMessageConsu } @Override - protected void handle(Message message, ReplyHolder replyHolder) { + protected void onMessage(Message message, ReplyMessageHolder replyHolder) { try { Object result = this.invoker.invokeMethod(message); if (result != null) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java b/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java index c34765456b..ad911971d8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/filter/MessageFilter.java @@ -17,7 +17,7 @@ package org.springframework.integration.filter; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.integration.message.selector.MessageSelector; import org.springframework.util.Assert; @@ -41,7 +41,7 @@ public class MessageFilter extends AbstractReplyProducingMessageConsumer { @Override - protected void handle(Message message, ReplyHolder replyHolder) { + protected void onMessage(Message message, ReplyMessageHolder replyHolder) { if (this.selector.accept(message)) { replyHolder.set(message); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java index 56b59b0590..8881303349 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java @@ -26,7 +26,7 @@ import org.springframework.integration.endpoint.AbstractReplyProducingMessageCon import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.MessagingGateway; import org.springframework.integration.endpoint.PollingConsumerEndpoint; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageConsumer; @@ -155,7 +155,7 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess MessageEndpoint correlator = null; MessageConsumer consumer = new AbstractReplyProducingMessageConsumer() { @Override - protected void handle(Message message, ReplyHolder replyHolder) { + protected void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); } }; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java index d7239fce0d..bdd052b011 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java @@ -19,7 +19,7 @@ package org.springframework.integration.splitter; import java.util.Collection; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; /** @@ -30,7 +30,7 @@ import org.springframework.integration.message.Message; public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageConsumer { @Override - protected final void handle(Message message, ReplyHolder replyHolder) { + protected final void onMessage(Message message, ReplyMessageHolder replyHolder) { Object result = this.splitMessage(message); if (result == null) { return; @@ -57,7 +57,7 @@ public abstract class AbstractMessageSplitter extends AbstractReplyProducingMess } } - private void addReply(ReplyHolder replyHolder, Object item, Object correlationId, int sequenceNumber, int sequenceSize) { + private void addReply(ReplyMessageHolder replyHolder, Object item, Object correlationId, int sequenceNumber, int sequenceSize) { replyHolder.add(item).setCorrelationId(correlationId) .setSequenceNumber(sequenceNumber) .setSequenceSize(sequenceSize); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/MessageTransformingConsumer.java b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/MessageTransformingConsumer.java index 934a1e466c..ef01b36a0a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/transformer/MessageTransformingConsumer.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/transformer/MessageTransformingConsumer.java @@ -17,7 +17,7 @@ package org.springframework.integration.transformer; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.util.Assert; @@ -44,7 +44,7 @@ public class MessageTransformingConsumer extends AbstractReplyProducingMessageCo @Override - protected void handle(Message message, ReplyHolder replyHolder) { + protected void onMessage(Message message, ReplyMessageHolder replyHolder) { Message result = transformer.transform(message); if (result != null) { replyHolder.set(result); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 1d70353b3b..2119f58789 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -37,7 +37,7 @@ import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; import org.springframework.integration.endpoint.PollingConsumerEndpoint; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.ErrorMessage; @@ -67,7 +67,7 @@ public class DefaultMessageBusTests { .setReturnAddress("targetChannel").build(); sourceChannel.send(message); AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); } }; @@ -125,12 +125,12 @@ public class DefaultMessageBusTests { QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); AbstractReplyProducingMessageConsumer consumer1 = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); } }; AbstractReplyProducingMessageConsumer consumer2 = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); } }; @@ -167,13 +167,13 @@ public class DefaultMessageBusTests { QueueChannel outputChannel2 = new QueueChannel(); final CountDownLatch latch = new CountDownLatch(2); AbstractReplyProducingMessageConsumer consumer1 = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); latch.countDown(); } }; AbstractReplyProducingMessageConsumer consumer2 = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message); latch.countDown(); } @@ -245,7 +245,7 @@ public class DefaultMessageBusTests { context.getBeanFactory().registerSingleton(DefaultMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel); final CountDownLatch latch = new CountDownLatch(1); AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { latch.countDown(); } }; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 9351f5bafd..f0070cc73f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -30,7 +30,7 @@ import org.springframework.integration.channel.ThreadLocalChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.endpoint.SubscribingConsumerEndpoint; import org.springframework.integration.message.Message; @@ -96,7 +96,7 @@ public class DirectChannelSubscriptionTests { @Test(expected = MessagingException.class) public void exceptionThrownFromRegisteredEndpoint() { AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { throw new RuntimeException("intentional test failure"); } }; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java index 22dfd8814f..1809ab94a3 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java @@ -33,7 +33,7 @@ import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.bus.DefaultMessageBus; import org.springframework.integration.endpoint.AbstractReplyProducingMessageConsumer; import org.springframework.integration.endpoint.PollingConsumerEndpoint; -import org.springframework.integration.endpoint.ReplyHolder; +import org.springframework.integration.endpoint.ReplyMessageHolder; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; @@ -52,7 +52,7 @@ public class MessageChannelTemplateTests { this.requestChannel = new QueueChannel(); this.requestChannel.setBeanName("requestChannel"); AbstractReplyProducingMessageConsumer consumer = new AbstractReplyProducingMessageConsumer() { - public void handle(Message message, ReplyHolder replyHolder) { + public void onMessage(Message message, ReplyMessageHolder replyHolder) { replyHolder.set(message.getPayload().toString().toUpperCase()); } };