Replacing RequestReplyTemplate with MessageExchangeTemplate. The ReplyMessageCorrelator delegation is now within the SimpleMessagingGateway.

This commit is contained in:
Mark Fisher
2008-07-30 03:19:57 +00:00
parent a8075554ed
commit 60cdfe4ffe
8 changed files with 218 additions and 325 deletions

View File

@@ -19,12 +19,19 @@ package org.springframework.integration.gateway;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
@@ -80,7 +87,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "456");
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -113,6 +120,39 @@ public class GatewayProxyFactoryBeanTests {
assertEquals(1, interceptor.getReceivedCount());
}
@Test
public void testMultipleMessagesWithResponseCorrelator() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"gatewayWithResponseCorrelator.xml", GatewayProxyFactoryBeanTests.class);
final TestService service = (TestService) context.getBean("proxy");
final String[] results = new String[100];
final CountDownLatch latch = new CountDownLatch(100);
Executor executor = Executors.newFixedThreadPool(25);
for (int i = 0; i < 100; i++) {
final int count = i;
executor.execute(new Runnable() {
public void run() {
// add some randomness to the ordering of requests
try {
Thread.sleep(new Random().nextInt(100));
}
catch (InterruptedException e) {
// ignore
}
results[count] = service.requestReply("test-" + count);
latch.countDown();
}
});
}
latch.await(5, TimeUnit.SECONDS);
for (int i = 0; i < 100; i++) {
assertEquals("test-" + i + "!!!", results[i]);
}
TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor");
assertEquals(100, interceptor.getSentCount());
assertEquals(100, interceptor.getReceivedCount());
}
@Test
public void testMessageAsMethodArgument() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
@@ -133,7 +173,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
@@ -177,7 +217,7 @@ public class GatewayProxyFactoryBeanTests {
public void run() {
Message<?> input = requestChannel.receive();
StringMessage response = new StringMessage(input.getPayload() + "bar");
((MessageChannel) input.getHeaders().getReturnAddress()).send(response);
((MessageTarget) input.getHeaders().getReturnAddress()).send(response);
}
}).start();
}

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.integration.gateway;
package org.springframework.integration.message;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -31,18 +31,20 @@ import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageExchangeTemplate;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class RequestReplyTemplateTests {
public class MessageExchangeTemplateTests {
private final QueueChannel requestChannel = new QueueChannel();
public RequestReplyTemplateTests() {
public MessageExchangeTemplateTests() {
MessageHandler testHandler = new MessageHandler() {
public Message<?> handle(Message<?> message) {
return new StringMessage(message.getPayload().toString().toUpperCase());
@@ -56,14 +58,14 @@ public class RequestReplyTemplateTests {
@Test
public void testSynchronousRequestReply() {
RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
Message<?> reply = template.request(new StringMessage("test"));
public void testSendAndReceive() {
MessageExchangeTemplate template = new MessageExchangeTemplate();
Message<?> reply = template.sendAndReceive(new StringMessage("test"), this.requestChannel);
assertEquals("TEST", reply.getPayload());
}
@Test
public void testAsynchronousRequestAndReply() throws InterruptedException {
public void testSendWithReturnAddress() throws InterruptedException {
final List<String> replies = new ArrayList<String>(3);
final CountDownLatch latch = new CountDownLatch(3);
MessageTarget replyTarget = new MessageTarget() {
@@ -73,10 +75,13 @@ public class RequestReplyTemplateTests {
return true;
}
};
RequestReplyTemplate template = new RequestReplyTemplate(requestChannel);
template.request(new StringMessage("test1"), replyTarget);
template.request(new StringMessage("test2"), replyTarget);
template.request(new StringMessage("test3"), replyTarget);
MessageExchangeTemplate template = new MessageExchangeTemplate();
Message<String> message1 = MessageBuilder.fromPayload("test1").setReturnAddress(replyTarget).build();
Message<String> message2 = MessageBuilder.fromPayload("test2").setReturnAddress(replyTarget).build();
Message<String> message3 = MessageBuilder.fromPayload("test3").setReturnAddress(replyTarget).build();
template.send(message1, this.requestChannel);
template.send(message2, this.requestChannel);
template.send(message3, this.requestChannel);
latch.await(2000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertTrue(replies.contains("TEST1"));