ReplyMessageCorrelator no longer delegates to a MessageStore. Instead, it simply passes to the Message's 'returnAddress' header (INT-366).

This commit is contained in:
Mark Fisher
2008-09-11 14:22:07 +00:00
parent 0f9dc8c750
commit dfd6d42346
7 changed files with 41 additions and 129 deletions

View File

@@ -124,7 +124,7 @@ public class GatewayProxyFactoryBeanTests {
public void testMultipleMessagesWithResponseCorrelator() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"gatewayWithResponseCorrelator.xml", GatewayProxyFactoryBeanTests.class);
int numRequests = 5;
int numRequests = 500;
final TestService service = (TestService) context.getBean("proxy");
final String[] results = new String[numRequests];
final CountDownLatch latch = new CountDownLatch(numRequests);

View File

@@ -64,7 +64,7 @@ public class GatewayParserTests {
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReply");
String result = service.requestReply("foo");
assertEquals("foobar", result);
assertEquals("foo", result);
}
@Test
@@ -75,7 +75,7 @@ public class GatewayParserTests {
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReplyWithMessageMapper");
String result = service.requestReply("foo");
assertEquals("foobar.mapped", result);
assertEquals("foo.mapped", result);
}
@Test
@@ -86,7 +86,7 @@ public class GatewayParserTests {
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReplyWithMessageCreator");
String result = service.requestReply("foo");
assertEquals("created.foobar", result);
assertEquals("created.foo", result);
}
@@ -94,7 +94,7 @@ public class GatewayParserTests {
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
Message<?> request = requestChannel.receive();
Message<String> reply = MessageBuilder.fromPayload(request.getPayload() + "bar")
Message<?> reply = MessageBuilder.fromMessage(request)
.setCorrelationId(request.getHeaders().getId()).build();
replyChannel.send(reply);
}

View File

@@ -18,7 +18,7 @@
</interceptors>
</channel>
<service-activator ref="handler" input-channel="requestChannel"/>
<service-activator ref="handler" input-channel="requestChannel" output-channel="replyChannel"/>
<beans:bean id="proxy" class="org.springframework.integration.gateway.GatewayProxyFactoryBean">
<beans:property name="serviceInterface" value="org.springframework.integration.gateway.TestService"/>

View File

@@ -18,14 +18,9 @@ package org.springframework.integration.handler;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
@@ -36,47 +31,13 @@ public class ReplyMessageCorrelatorTests {
@Test
public void testReceiversPrecedeReply() throws InterruptedException {
final ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(10);
final AtomicInteger replyCounter = new AtomicInteger();
CountDownLatch latch = startReceivers(correlator, replyCounter, 5, 500);
final ReplyMessageCorrelator correlator = new ReplyMessageCorrelator();
QueueChannel replyChannel = new QueueChannel();
Message<String> message = MessageBuilder.fromPayload("test")
.setCorrelationId("123").build();
.setCorrelationId("123").setReturnAddress(replyChannel).build();
correlator.handle(message);
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertEquals(1, replyCounter.get());
}
@Test
public void testReplyPrecedeReceivers() throws InterruptedException {
final ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(10);
Message<String> message = MessageBuilder.fromPayload("test")
.setCorrelationId("123").build();
correlator.handle(message);
final AtomicInteger replyCounter = new AtomicInteger();
CountDownLatch latch = startReceivers(correlator, replyCounter, 5, 50);
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertEquals(1, replyCounter.get());
}
private static CountDownLatch startReceivers(final ReplyMessageCorrelator correlator,
final AtomicInteger replyCounter, int numReceivers, final long timeout) {
final CountDownLatch latch = new CountDownLatch(numReceivers);
Executor executor = Executors.newFixedThreadPool(numReceivers);
for (int i = 0; i < numReceivers; i++) {
executor.execute(new Runnable() {
public void run() {
Message<?> reply = correlator.getReply("123", timeout);
if (reply != null) {
replyCounter.incrementAndGet();
}
latch.countDown();
}
});
}
return latch;
Message<?> reply = replyChannel.receive(0);
assertEquals("test", reply.getPayload());
}
}