diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingGateway.java
index 216ea96601..4367dc641e 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingGateway.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessagingGateway.java
@@ -35,4 +35,6 @@ public interface MessagingGateway {
Message> sendAndReceiveMessage(Object object);
+ void receiveAndForward();
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
index 8926f27ae6..224144feba 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
@@ -74,7 +74,7 @@ public class GatewayProxyFactoryBean extends SimpleMessagingGateway
}
public void setBeanFactory(BeanFactory beanFactory) {
- this.getRequestReplyTemplate().setMessageBus(
+ this.setMessageBus(
(MessageBus) beanFactory.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME));
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java
index ded001e417..1fa2ce11c5 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java
@@ -16,54 +16,48 @@
package org.springframework.integration.gateway;
-import org.springframework.integration.bus.MessageBus;
-import org.springframework.integration.bus.MessageBusAware;
import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.message.MessageExchangeTemplate;
/**
- * A convenient base class providing access to a {@link RequestReplyTemplate} and exposing setter methods for
+ * A convenient base class providing access to a {@link MessageExchangeTemplate} and exposing setter methods for
* configuring request and reply {@link MessageChannel MessageChannels}. May be used as a base class for framework
* components so that the details of messaging are well-encapsulated and hidden from application code. For example,
* see {@link SimpleMessagingGateway}.
*
* @author Mark Fisher
*/
-public abstract class MessagingGatewaySupport implements MessageBusAware {
+public abstract class MessagingGatewaySupport {
- private final RequestReplyTemplate requestReplyTemplate = new RequestReplyTemplate();
+ private final MessageExchangeTemplate messageExchangeTemplate = new MessageExchangeTemplate();
- public MessagingGatewaySupport(MessageChannel requestChannel) {
- this.requestReplyTemplate.setRequestChannel(requestChannel);
- }
-
- public MessagingGatewaySupport() {
- super();
- }
-
-
- public void setMessageBus(MessageBus messageBus) {
- this.requestReplyTemplate.setMessageBus(messageBus);
- }
-
- public void setRequestChannel(MessageChannel requestChannel) {
- this.requestReplyTemplate.setRequestChannel(requestChannel);
- }
-
- public void setReplyChannel(MessageChannel replyChannel) {
- this.requestReplyTemplate.setReplyChannel(replyChannel);
- }
-
+ /**
+ * Set the timeout value for sending request messages. If not
+ * explicitly configured, the default is an indefinite timeout.
+ *
+ * @param requestTimeout the timeout value in milliseconds
+ */
public void setRequestTimeout(long requestTimeout) {
- this.requestReplyTemplate.setRequestTimeout(requestTimeout);
+ this.messageExchangeTemplate.setSendTimeout(requestTimeout);
}
+ /**
+ * Set the timeout value for receiving reply messages. If not
+ * explicitly configured, the default is an indefinite timeout.
+ *
+ * @param replyTimeout the timeout value in milliseconds
+ */
public void setReplyTimeout(long replyTimeout) {
- this.requestReplyTemplate.setReplyTimeout(replyTimeout);
+ this.messageExchangeTemplate.setReceiveTimeout(replyTimeout);
}
- protected final RequestReplyTemplate getRequestReplyTemplate() {
- return this.requestReplyTemplate;
+ /**
+ * Retrieve the {@link MessageExchangeTemplate} for performing
+ * send and receive operations across channels.
+ */
+ protected final MessageExchangeTemplate getMessageExchangeTemplate() {
+ return this.messageExchangeTemplate;
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java
deleted file mode 100644
index 3ae9ad8f4a..0000000000
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.gateway;
-
-import java.util.List;
-
-import org.springframework.integration.ConfigurationException;
-import org.springframework.integration.bus.MessageBus;
-import org.springframework.integration.bus.MessageBusAware;
-import org.springframework.integration.channel.MessageChannel;
-import org.springframework.integration.channel.RendezvousChannel;
-import org.springframework.integration.endpoint.EndpointRegistry;
-import org.springframework.integration.endpoint.HandlerEndpoint;
-import org.springframework.integration.handler.ReplyMessageCorrelator;
-import org.springframework.integration.message.Message;
-import org.springframework.integration.message.MessageBuilder;
-import org.springframework.integration.message.MessageDeliveryException;
-import org.springframework.integration.message.MessageTarget;
-import org.springframework.integration.message.MessagingException;
-import org.springframework.integration.message.selector.MessageSelector;
-
-/**
- * A template that facilitates the implementation of request-reply usage
- * scenarios above one-way {@link MessageChannel MessageChannels}.
- *
- * @author Mark Fisher
- */
-public class RequestReplyTemplate implements MessageBusAware {
-
- private MessageChannel requestChannel;
-
- private MessageChannel replyChannel;
-
- private volatile long requestTimeout = -1;
-
- private volatile long replyTimeout = -1;
-
- private ReplyMessageCorrelator replyMessageCorrelator;
-
- private EndpointRegistry endpointRegistry;
-
- private final Object replyMessageCorrelatorMonitor = new Object();
-
-
- /**
- * Create a RequestReplyTemplate.
- *
- * @param requestChannel the channel to which request messages will be sent
- * @param replyChannel the channel from which reply messages will be received
- */
- public RequestReplyTemplate(MessageChannel requestChannel, MessageChannel replyChannel) {
- this.requestChannel = requestChannel;
- this.replyChannel = replyChannel;
- }
-
- /**
- * Create a RequestReplyTemplate that will use anonymous temporary channels for replies.
- *
- * @param requestChannel the channel to which request messages will be sent
- */
- public RequestReplyTemplate(MessageChannel requestChannel) {
- this(requestChannel, null);
- }
-
- public RequestReplyTemplate() {
- }
-
-
- /**
- * Set the request channel.
- *
- * @param requestChannel the channel to which request messages will be sent
- */
- public void setRequestChannel(MessageChannel requestChannel) {
- this.requestChannel = requestChannel;
- }
-
- /**
- * Set the reply channel. If no reply channel is provided, this template will
- * always use an anonymous, temporary channel for handling replies.
- *
- * @param replyChannel the channel from which reply messages will be received
- */
- public void setReplyChannel(MessageChannel replyChannel) {
- this.replyChannel = replyChannel;
- }
-
- /**
- * Set the timeout value for sending request messages. If not
- * explicitly configured, the default is an indefinite timeout.
- *
- * @param requestTimeout the timeout value in milliseconds
- */
- public void setRequestTimeout(long requestTimeout) {
- this.requestTimeout = requestTimeout;
- }
-
- /**
- * Set the timeout value for receiving reply messages. If not
- * explicitly configured, the default is an indefinite timeout.
- *
- * @param replyTimeout the timeout value in milliseconds
- */
- public void setReplyTimeout(long replyTimeout) {
- this.replyTimeout = replyTimeout;
- }
-
- public void setEndpointRegistry(EndpointRegistry endpointRegistry) {
- this.endpointRegistry = endpointRegistry;
- }
-
- public void setMessageBus(MessageBus messageBus) {
- if (this.endpointRegistry == null) {
- this.setEndpointRegistry(messageBus);
- }
- }
-
- public boolean send(Message> message) {
- if (message == null) {
- throw new MessageDeliveryException(message, "Message must not be null.");
- }
- if (this.requestChannel == null) {
- throw new MessageDeliveryException(message,
- "No request channel has been configured. Cannot send message.");
- }
- boolean sent = (this.requestTimeout >= 0) ?
- this.requestChannel.send(message, this.requestTimeout) : this.requestChannel.send(message);
- if (!sent) {
- throw new MessageDeliveryException(message, "Failed to send request message.");
- }
- return true;
- }
-
- public Message> receive() {
- if (this.replyChannel == null) {
- throw new MessagingException(
- "No reply channel has been configured. Cannot perform receive only operation.");
- }
- return this.receiveResponse(this.replyChannel);
- }
-
- /**
- * Send a request message whose reply should be sent to the provided target.
- */
- public boolean request(Message> message, MessageTarget target) {
- MessageChannel replyChannelAdapter = new ReplyHandlingChannelAdapter(target);
- Message> requestMessage = MessageBuilder.fromMessage(message)
- .setReturnAddress(replyChannelAdapter).build();
- return this.send(requestMessage);
- }
-
- /**
- * Send a request message and wait for a reply message using the configured
- * timeout values.
- *
- * @param requestMessage the request message to send
- *
- * @return the reply message or null
- */
- public Message> request(Message> message) {
- if (this.requestChannel == null) {
- throw new MessageDeliveryException(message,
- "No request channel available. Cannot send request message.");
- }
- if (this.replyChannel != null) {
- return this.sendAndReceiveWithReplyMessageCorrelator(message);
- }
- else {
- return this.sendAndReceiveWithTemporaryChannel(message);
- }
- }
-
- private Message> sendAndReceiveWithReplyMessageCorrelator(Message> message) {
- if (this.replyMessageCorrelator == null) {
- this.registerReplyMessageCorrelator();
- }
- message = MessageBuilder.fromMessage(message)
- .setReturnAddress(this.replyChannel).build();
- this.send(message);
- return (this.replyTimeout >= 0) ? this.replyMessageCorrelator.getReply(message.getId(), this.replyTimeout) :
- this.replyMessageCorrelator.getReply(message.getId());
- }
-
- private Message> sendAndReceiveWithTemporaryChannel(Message> message) {
- RendezvousChannel temporaryChannel = new RendezvousChannel();
- Message> requestMessage = MessageBuilder.fromMessage(message)
- .setReturnAddress(temporaryChannel).build();
- this.send(requestMessage);
- return this.receiveResponse(temporaryChannel);
- }
-
- private Message> receiveResponse(MessageChannel channel) {
- return (this.replyTimeout >= 0) ? channel.receive(this.replyTimeout) : channel.receive();
- }
-
- private void registerReplyMessageCorrelator() {
- synchronized (this.replyMessageCorrelatorMonitor) {
- if (this.replyMessageCorrelator != null) {
- return;
- }
- if (this.endpointRegistry == null) {
- throw new ConfigurationException("No EndpointRegistry available. Cannot register ResponseCorrelator.");
- }
- ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(10);
- HandlerEndpoint endpoint = new HandlerEndpoint(correlator);
- endpoint.setSource(this.replyChannel);
- endpoint.setName("internal.correlator." + this);
- this.endpointRegistry.registerEndpoint(endpoint);
- this.replyMessageCorrelator = correlator;
- }
- }
-
-
- private static class ReplyHandlingChannelAdapter implements MessageChannel {
-
- private final MessageTarget target;
-
-
- ReplyHandlingChannelAdapter(MessageTarget target) {
- this.target = target;
- }
-
-
- public List> clear() {
- return null;
- }
-
- public String getName() {
- return null;
- }
-
- public List> purge(MessageSelector selector) {
- return null;
- }
-
- public void setName(String name) {
- }
-
- public Message> receive() {
- return null;
- }
-
- public Message> receive(long timeout) {
- return null;
- }
-
- public boolean send(Message> message) {
- this.target.send(message);
- return true;
- }
-
- public boolean send(Message> message, long timeout) {
- return this.send(message);
- }
-
- }
-
-}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java
index 38ba78542a..abaf164096 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java
@@ -16,11 +16,18 @@
package org.springframework.integration.gateway;
+import org.springframework.integration.ConfigurationException;
+import org.springframework.integration.bus.MessageBus;
+import org.springframework.integration.bus.MessageBusAware;
import org.springframework.integration.channel.MessageChannel;
+import org.springframework.integration.endpoint.EndpointRegistry;
+import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.endpoint.MessagingGateway;
+import org.springframework.integration.handler.ReplyMessageCorrelator;
import org.springframework.integration.message.DefaultMessageCreator;
import org.springframework.integration.message.DefaultMessageMapper;
import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageCreator;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageMapper;
@@ -35,15 +42,29 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public class SimpleMessagingGateway extends MessagingGatewaySupport implements MessagingGateway {
+public class SimpleMessagingGateway extends MessagingGatewaySupport implements MessagingGateway, MessageBusAware {
- private MessageCreator messageCreator = new DefaultMessageCreator();
+ private volatile MessageChannel requestChannel;
- private MessageMapper messageMapper = new DefaultMessageMapper();
+ private volatile MessageChannel replyChannel;
+
+ private volatile long replyTimeout = 5000;
+
+ private volatile int replyMapCapacity = 100;
+
+ private volatile MessageCreator messageCreator = new DefaultMessageCreator();
+
+ private volatile MessageMapper messageMapper = new DefaultMessageMapper();
+
+ private volatile ReplyMessageCorrelator replyMessageCorrelator;
+
+ private volatile EndpointRegistry endpointRegistry;
+
+ private final Object replyMessageCorrelatorMonitor = new Object();
public SimpleMessagingGateway(MessageChannel requestChannel) {
- super(requestChannel);
+ this.requestChannel = requestChannel;
}
public SimpleMessagingGateway() {
@@ -51,6 +72,33 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M
}
+ /**
+ * Set the request channel.
+ *
+ * @param requestChannel the channel to which request messages will be sent
+ */
+ public void setRequestChannel(MessageChannel requestChannel) {
+ this.requestChannel = requestChannel;
+ }
+
+ /**
+ * Set the reply channel. If no reply channel is provided, this template will
+ * always use an anonymous, temporary channel for handling replies.
+ *
+ * @param replyChannel the channel from which reply messages will be received
+ */
+ public void setReplyChannel(MessageChannel replyChannel) {
+ this.replyChannel = replyChannel;
+ }
+
+ /**
+ * Set the max capacity for the map that is used to store replies
+ * until requested by the correlationId. The default value is 100.
+ */
+ public void setReplyMapCapacity(int replyMapCapacity) {
+ this.replyMapCapacity = replyMapCapacity;
+ }
+
public void setMessageCreator(MessageCreator, ?> messageCreator) {
Assert.notNull(messageCreator, "messageCreator must not be null");
this.messageCreator = messageCreator;
@@ -61,21 +109,47 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M
this.messageMapper = messageMapper;
}
+ public void setMessageBus(MessageBus messageBus) {
+ this.endpointRegistry = messageBus;
+ }
+
+ public void setReplyTimeout(long replyTimeout) {
+ this.replyTimeout = replyTimeout;
+ super.setReplyTimeout(replyTimeout);
+ }
+
public void send(Object object) {
+ if (this.requestChannel == null) {
+ throw new IllegalStateException(
+ "send is not supported, because no request channel has been configured");
+ }
Message> message = (object instanceof Message) ? (Message) object :
this.messageCreator.createMessage(object);
if (message != null) {
- if (!this.getRequestReplyTemplate().send(message)) {
+ if (!this.getMessageExchangeTemplate().send(message, this.requestChannel)) {
throw new MessageDeliveryException(message, "failed to send Message to channel");
}
}
}
public Object receive() {
- Message> message = this.getRequestReplyTemplate().receive();
+ if (this.replyChannel == null) {
+ throw new IllegalStateException(
+ "no-arg receive is not supported, because no reply channel has been configured");
+ }
+ Message> message = this.getMessageExchangeTemplate().receive(this.replyChannel);
return (message != null) ? this.messageMapper.mapMessage(message) : null;
}
+ public void receiveAndForward() {
+ if (this.replyChannel == null || this.requestChannel == null) {
+ throw new IllegalStateException(
+ "receiveAndForward is not supported, because either the request or reply channel"
+ + " has not been configured");
+ }
+ this.getMessageExchangeTemplate().receiveAndForward(this.replyChannel, this.requestChannel);
+ }
+
public Object sendAndReceive(Object object) {
return this.sendAndReceive(object, true);
}
@@ -90,11 +164,52 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M
if (request == null) {
return null;
}
- Message> reply = this.getRequestReplyTemplate().request(request);
+ Message> reply = this.sendAndReceiveMessage(request);
if (!shouldMapMessage) {
return reply;
}
return (reply != null) ? this.messageMapper.mapMessage(reply) : null;
}
+ private Message> sendAndReceiveMessage(Message> message) {
+ if (this.requestChannel == null) {
+ throw new MessageDeliveryException(message,
+ "No request channel available. Cannot send request message.");
+ }
+ if (this.replyChannel != null) {
+ return this.sendAndReceiveWithReplyMessageCorrelator(message);
+ }
+ else {
+ return this.getMessageExchangeTemplate().sendAndReceive(message, this.requestChannel);
+ }
+ }
+
+ private Message> sendAndReceiveWithReplyMessageCorrelator(Message> message) {
+ if (this.replyMessageCorrelator == null) {
+ this.registerReplyMessageCorrelator();
+ }
+ message = MessageBuilder.fromMessage(message).setReturnAddress(this.replyChannel).build();
+ this.send(message);
+ return (this.replyTimeout >= 0)
+ ? this.replyMessageCorrelator.getReply(message.getId(), this.replyTimeout)
+ : this.replyMessageCorrelator.getReply(message.getId());
+ }
+
+ private void registerReplyMessageCorrelator() {
+ synchronized (this.replyMessageCorrelatorMonitor) {
+ if (this.replyMessageCorrelator != null) {
+ return;
+ }
+ if (this.endpointRegistry == null) {
+ throw new ConfigurationException("No EndpointRegistry available. Cannot register ReplyMessageCorrelator.");
+ }
+ ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(this.replyMapCapacity);
+ HandlerEndpoint endpoint = new HandlerEndpoint(correlator);
+ endpoint.setSource(this.replyChannel);
+ endpoint.setName("internal.correlator." + this);
+ this.endpointRegistry.registerEndpoint(endpoint);
+ this.replyMessageCorrelator = correlator;
+ }
+ }
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/util/BoundedHashMap.java b/org.springframework.integration/src/main/java/org/springframework/integration/util/BoundedHashMap.java
index 0bd5df50d0..a67a690d87 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/util/BoundedHashMap.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/util/BoundedHashMap.java
@@ -19,6 +19,9 @@ package org.springframework.integration.util;
import java.util.LinkedHashMap;
import java.util.Map.Entry;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.springframework.util.Assert;
/**
@@ -28,6 +31,8 @@ import org.springframework.util.Assert;
*/
public class BoundedHashMap extends LinkedHashMap {
+ private static final Log logger = LogFactory.getLog(BoundedHashMap.class);
+
private final int capacity;
@@ -39,7 +44,11 @@ public class BoundedHashMap extends LinkedHashMap {
@Override
protected boolean removeEldestEntry(Entry eldest) {
- return this.size() > this.capacity;
+ boolean shouldRemove = this.size() > this.capacity;
+ if (shouldRemove && logger.isDebugEnabled()) {
+ logger.debug("removing eldest entry from BoundedHashMap with capacity of " + this.capacity);
+ }
+ return shouldRemove;
}
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java
index f1bb30c683..590696af3e 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java
@@ -19,12 +19,19 @@ package org.springframework.integration.gateway;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
@@ -80,7 +87,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "456");
- ((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
+ ((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -113,6 +120,39 @@ public class GatewayProxyFactoryBeanTests {
assertEquals(1, interceptor.getReceivedCount());
}
+ @Test
+ public void testMultipleMessagesWithResponseCorrelator() throws InterruptedException {
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
+ "gatewayWithResponseCorrelator.xml", GatewayProxyFactoryBeanTests.class);
+ final TestService service = (TestService) context.getBean("proxy");
+ final String[] results = new String[100];
+ final CountDownLatch latch = new CountDownLatch(100);
+ Executor executor = Executors.newFixedThreadPool(25);
+ for (int i = 0; i < 100; i++) {
+ final int count = i;
+ executor.execute(new Runnable() {
+ public void run() {
+ // add some randomness to the ordering of requests
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ }
+ catch (InterruptedException e) {
+ // ignore
+ }
+ results[count] = service.requestReply("test-" + count);
+ latch.countDown();
+ }
+ });
+ }
+ latch.await(5, TimeUnit.SECONDS);
+ for (int i = 0; i < 100; i++) {
+ assertEquals("test-" + i + "!!!", results[i]);
+ }
+ TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor");
+ assertEquals(100, interceptor.getSentCount());
+ assertEquals(100, interceptor.getReceivedCount());
+ }
+
@Test
public void testMessageAsMethodArgument() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
@@ -133,7 +173,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
- ((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
+ ((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -177,7 +217,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
- ((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
+ ((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/RequestReplyTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java
similarity index 69%
rename from org.springframework.integration/src/test/java/org/springframework/integration/gateway/RequestReplyTemplateTests.java
rename to org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java
index 558c42e1a1..302c8fc5b1 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/RequestReplyTemplateTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java
@@ -14,7 +14,7 @@
* limitations under the License.
*/
-package org.springframework.integration.gateway;
+package org.springframework.integration.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -31,18 +31,20 @@ import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
+import org.springframework.integration.message.MessageBuilder;
+import org.springframework.integration.message.MessageExchangeTemplate;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
-public class RequestReplyTemplateTests {
+public class MessageExchangeTemplateTests {
private final QueueChannel requestChannel = new QueueChannel();
- public RequestReplyTemplateTests() {
+ public MessageExchangeTemplateTests() {
MessageHandler testHandler = new MessageHandler() {
public Message> handle(Message> message) {
return new StringMessage(message.getPayload().toString().toUpperCase());
@@ -56,14 +58,14 @@ public class RequestReplyTemplateTests {
@Test
- public void testSynchronousRequestReply() {
- RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
- Message> reply = template.request(new StringMessage("test"));
+ public void testSendAndReceive() {
+ MessageExchangeTemplate template = new MessageExchangeTemplate();
+ Message> reply = template.sendAndReceive(new StringMessage("test"), this.requestChannel);
assertEquals("TEST", reply.getPayload());
}
@Test
- public void testAsynchronousRequestAndReply() throws InterruptedException {
+ public void testSendWithReturnAddress() throws InterruptedException {
final List replies = new ArrayList(3);
final CountDownLatch latch = new CountDownLatch(3);
MessageTarget replyTarget = new MessageTarget() {
@@ -73,10 +75,13 @@ public class RequestReplyTemplateTests {
return true;
}
};
- RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
- template.request(new StringMessage("test1"), replyTarget);
- template.request(new StringMessage("test2"), replyTarget);
- template.request(new StringMessage("test3"), replyTarget);
+ MessageExchangeTemplate template = new MessageExchangeTemplate();
+ Message message1 = MessageBuilder.fromPayload("test1").setReturnAddress(replyTarget).build();
+ Message message2 = MessageBuilder.fromPayload("test2").setReturnAddress(replyTarget).build();
+ Message message3 = MessageBuilder.fromPayload("test3").setReturnAddress(replyTarget).build();
+ template.send(message1, this.requestChannel);
+ template.send(message2, this.requestChannel);
+ template.send(message3, this.requestChannel);
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertTrue(replies.contains("TEST1"));