MessageFilter is now an endpoint.

This commit is contained in:
Mark Fisher
2008-09-05 21:04:01 +00:00
parent fa05d14865
commit e75fbeef76
10 changed files with 151 additions and 40 deletions

View File

@@ -287,7 +287,7 @@ public class ServiceActivatorEndpointTests {
}
@Test
public void correlationId() {
public void correlationIdNotSetIfMessageIsReturnedUnaltered() {
QueueChannel replyChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new Object() {
@SuppressWarnings("unused")
@@ -299,6 +299,22 @@ public class ServiceActivatorEndpointTests {
.setReturnAddress(replyChannel).build();
endpoint.send(message);
Message<?> reply = replyChannel.receive(500);
assertNull(reply.getHeaders().getCorrelationId());
}
@Test
public void correlationIdSetForReplyMessage() {
QueueChannel replyChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new Object() {
@SuppressWarnings("unused")
public Message<?> handle(Message<?> message) {
return MessageBuilder.fromMessage(message).build();
}
}, "handle");
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
endpoint.send(message);
Message<?> reply = replyChannel.receive(500);
assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId());
}

View File

@@ -124,7 +124,7 @@ public class GatewayProxyFactoryBeanTests {
public void testMultipleMessagesWithResponseCorrelator() throws InterruptedException {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"gatewayWithResponseCorrelator.xml", GatewayProxyFactoryBeanTests.class);
int numRequests = 25;
int numRequests = 5;
final TestService service = (TestService) context.getBean("proxy");
final String[] results = new String[numRequests];
final CountDownLatch latch = new CountDownLatch(numRequests);

View File

@@ -21,7 +21,9 @@ import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
@@ -39,22 +41,28 @@ public class CorrelationIdTests {
Object correlationId = "123-ABC";
Message<String> message = MessageBuilder.fromPayload("test")
.setCorrelationId(correlationId).build();
DefaultMessageHandler handler = new DefaultMessageHandler();
handler.setObject(new TestBean());
handler.setMethodName("upperCase");
handler.afterPropertiesSet();
Message<?> reply = handler.handle(message);
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(correlationId, reply.getHeaders().getCorrelationId());
}
@Test
public void testCorrelationIdCopiedFromMessageIdByDefault() {
Message<String> message = MessageBuilder.fromPayload("test").build();
DefaultMessageHandler handler = new DefaultMessageHandler();
handler.setObject(new TestBean());
handler.setMethodName("upperCase");
handler.afterPropertiesSet();
Message<?> reply = handler.handle(message);
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId());
}
@@ -62,11 +70,14 @@ public class CorrelationIdTests {
public void testCorrelationIdCopiedFromMessageCorrelationIdIfAvailable() {
Message<String> message = MessageBuilder.fromPayload("test")
.setCorrelationId("correlationId").build();
DefaultMessageHandler handler = new DefaultMessageHandler();
handler.setObject(new TestBean());
handler.setMethodName("upperCase");
handler.afterPropertiesSet();
Message<?> reply = handler.handle(message);
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(message.getHeaders().getCorrelationId(), reply.getHeaders().getCorrelationId());
assertTrue(message.getHeaders().getCorrelationId().equals(reply.getHeaders().getCorrelationId()));
}
@@ -76,22 +87,28 @@ public class CorrelationIdTests {
Object correlationId = "123-ABC";
Message<String> message = MessageBuilder.fromPayload("test")
.setCorrelationId(correlationId).build();
DefaultMessageHandler handler = new DefaultMessageHandler();
handler.setObject(new TestBean());
handler.setMethodName("createMessage");
handler.afterPropertiesSet();
Message<?> reply = handler.handle(message);
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
endpoint.setSource(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals("456-XYZ", reply.getHeaders().getCorrelationId());
}
@Test
public void testCorrelationNotCopiedFromRequestMessgeIdIfAlreadySetByHandler() throws Exception {
Message<?> message = new StringMessage("test");
DefaultMessageHandler handler = new DefaultMessageHandler();
handler.setObject(new TestBean());
handler.setMethodName("createMessage");
handler.afterPropertiesSet();
Message<?> reply = handler.handle(message);
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
endpoint.setSource(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals("456-XYZ", reply.getHeaders().getCorrelationId());
}

View File

@@ -17,10 +17,14 @@
package org.springframework.integration.handler;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
@@ -31,7 +35,7 @@ import org.springframework.integration.message.selector.MessageSelector;
public class MessageFilterTests {
@Test
public void testFilterAcceptsMessage() {
public void filterAcceptsMessage() {
MessageFilter filter = new MessageFilter(new MessageSelector() {
public boolean accept(Message<?> message) {
return true;
@@ -42,7 +46,7 @@ public class MessageFilterTests {
}
@Test
public void testFilterRejectsMessage() {
public void filterRejectsMessage() {
MessageFilter filter = new MessageFilter(new MessageSelector() {
public boolean accept(Message<?> message) {
return false;
@@ -51,4 +55,40 @@ public class MessageFilterTests {
assertNull(filter.handle(new StringMessage("test")));
}
@Test
public void filterAcceptsWithChannels() {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel();
MessageFilter filter = new MessageFilter(new MessageSelector() {
public boolean accept(Message<?> message) {
return true;
}
});
filter.setSource(inputChannel);
filter.setOutputChannel(outputChannel);
filter.afterPropertiesSet();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertNotNull(reply);
assertEquals(message, reply);
}
@Test
public void filterRejectsWithChannels() {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel();
MessageFilter filter = new MessageFilter(new MessageSelector() {
public boolean accept(Message<?> message) {
return false;
}
});
filter.setSource(inputChannel);
filter.setOutputChannel(outputChannel);
filter.afterPropertiesSet();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));
assertNull(outputChannel.receive(0));
}
}