Split MessageSource types into 2 sub-interfaces: PollableSource and SubscribableSource. The MessageChannel hierarchy has also been revised accordingly. DirectChannel and PublishSubscribeChannel are now SubscribableSources, while the other queue-based channels are PollableSources. The PollableChannel interface extends BlockingSource which in turn is an extension of PollableSource that adds timeout-aware methods.

This commit is contained in:
Mark Fisher
2008-07-30 20:48:00 +00:00
parent 759b5f6d0e
commit fa58dc9457
77 changed files with 422 additions and 497 deletions

View File

@@ -23,7 +23,6 @@ import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -34,7 +33,7 @@ public class MessagePublishingInterceptorTests {
@Test
public void testNonNullReturnValuePublishedWithDefaultChannel() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor();
interceptor.setDefaultChannel(channel);
TestService proxy = (TestService) this.createProxy(new TestServiceImpl("hello world"), interceptor);
@@ -46,7 +45,7 @@ public class MessagePublishingInterceptorTests {
@Test
public void testNullReturnValueNotPublished() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor();
interceptor.setDefaultChannel(channel);
TestService proxy = (TestService) this.createProxy(new TestServiceImpl(null), interceptor);
@@ -56,7 +55,7 @@ public class MessagePublishingInterceptorTests {
@Test
public void testVoidReturnValueNotPublished() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor();
interceptor.setDefaultChannel(channel);
TestService proxy = (TestService) this.createProxy(new TestServiceImpl(null), interceptor);

View File

@@ -26,7 +26,6 @@ import org.springframework.aop.framework.ProxyFactory;
import org.springframework.integration.annotation.Publisher;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.DefaultChannelRegistry;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
@@ -37,7 +36,7 @@ public class PublisherAnnotationAdvisorTests {
@Test
public void testPublisherAnnotation() {
final MessageChannel channel = new QueueChannel();
final QueueChannel channel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("testChannel", channel);
PublisherAnnotationAdvisor advisor = new PublisherAnnotationAdvisor(channelRegistry);
@@ -50,7 +49,7 @@ public class PublisherAnnotationAdvisorTests {
@Test
public void testNoPublisherAnnotation() {
final MessageChannel channel = new QueueChannel();
final QueueChannel channel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("testChannel", channel);
PublisherAnnotationAdvisor advisor = new PublisherAnnotationAdvisor(channelRegistry);

View File

@@ -22,7 +22,7 @@ import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
/**
@@ -36,7 +36,7 @@ public class PublisherAnnotationPostProcessorTests {
"publisherAnnotationPostProcessorTests.xml", this.getClass());
ITestBean testBean = (ITestBean) context.getBean("testBean");
testBean.test();
MessageChannel channel = (MessageChannel) context.getBean("testChannel");
PollableChannel channel = (PollableChannel) context.getBean("testChannel");
Message<?> result = channel.receive();
assertEquals("test", result.getPayload());
}

View File

@@ -29,6 +29,7 @@ import org.junit.Test;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dispatcher.PublishSubscribeChannel;
import org.springframework.integration.endpoint.SourceEndpoint;
@@ -38,6 +39,7 @@ import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.PollingSchedule;
@@ -49,8 +51,8 @@ public class DefaultMessageBusTests {
@Test
public void testRegistrationWithInputChannelReference() {
DefaultMessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
MessageChannel targetChannel = new QueueChannel();
QueueChannel sourceChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
bus.registerChannel("sourceChannel", sourceChannel);
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress("targetChannel").build();
@@ -71,8 +73,8 @@ public class DefaultMessageBusTests {
@Test
public void testRegistrationWithInputChannelName() {
MessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
MessageChannel targetChannel = new QueueChannel();
QueueChannel sourceChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
bus.registerChannel("sourceChannel", sourceChannel);
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress("targetChannel").build();
@@ -93,9 +95,9 @@ public class DefaultMessageBusTests {
@Test
public void testChannelsWithoutHandlers() {
MessageBus bus = new DefaultMessageBus();
MessageChannel sourceChannel = new QueueChannel();
QueueChannel sourceChannel = new QueueChannel();
sourceChannel.send(new StringMessage("test"));
MessageChannel targetChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
bus.registerChannel("sourceChannel", sourceChannel);
bus.registerChannel("targetChannel", targetChannel);
bus.start();
@@ -108,9 +110,9 @@ public class DefaultMessageBusTests {
public void testAutodetectionWithApplicationContext() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("messageBusTests.xml", this.getClass());
context.start();
MessageChannel sourceChannel = (MessageChannel) context.getBean("sourceChannel");
PollableChannel sourceChannel = (PollableChannel) context.getBean("sourceChannel");
sourceChannel.send(new GenericMessage<String>("test"));
MessageChannel targetChannel = (MessageChannel) context.getBean("targetChannel");
PollableChannel targetChannel = (PollableChannel) context.getBean("targetChannel");
MessageBus bus = (MessageBus) context.getBean("bus");
bus.start();
Message<?> result = targetChannel.receive(1000);
@@ -198,7 +200,7 @@ public class DefaultMessageBusTests {
bus.registerEndpoint(sourceEndpoint);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = bus.getErrorChannel().receive(100);
Message<?> message = ((PollableChannel) bus.getErrorChannel()).receive(5000);
assertNotNull("message should not be null", message);
assertTrue(message instanceof ErrorMessage);
assertEquals("intentional test failure", ((ErrorMessage) message).getPayload().getMessage());
@@ -245,7 +247,7 @@ public class DefaultMessageBusTests {
}
private static class FailingSource implements MessageSource<Object> {
private static class FailingSource implements PollableSource<Object> {
private CountDownLatch latch;

View File

@@ -43,7 +43,7 @@ public class DirectChannelSubscriptionTests {
private MessageChannel sourceChannel = new DirectChannel();
private MessageChannel targetChannel = new ThreadLocalChannel();
private ThreadLocalChannel targetChannel = new ThreadLocalChannel();
@Before

View File

@@ -35,7 +35,7 @@ public class ChannelPurgerTests {
@Test
public void testPurgeAllWithoutSelector() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
channel.send(new StringMessage("test1"));
channel.send(new StringMessage("test2"));
channel.send(new StringMessage("test3"));
@@ -47,7 +47,7 @@ public class ChannelPurgerTests {
@Test
public void testPurgeAllWithSelector() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
channel.send(new StringMessage("test1"));
channel.send(new StringMessage("test2"));
channel.send(new StringMessage("test3"));
@@ -63,7 +63,7 @@ public class ChannelPurgerTests {
@Test
public void testPurgeNoneWithSelector() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
channel.send(new StringMessage("test1"));
channel.send(new StringMessage("test2"));
channel.send(new StringMessage("test3"));
@@ -81,7 +81,7 @@ public class ChannelPurgerTests {
@Test
public void testPurgeSubsetWithSelector() {
MessageChannel channel = new QueueChannel();
QueueChannel channel = new QueueChannel();
channel.send(new StringMessage("test1"));
channel.send(new StringMessage("test2"));
channel.send(new StringMessage("test3"));
@@ -100,8 +100,8 @@ public class ChannelPurgerTests {
@Test
public void testMultipleChannelsWithNoSelector() {
MessageChannel channel1 = new QueueChannel();
MessageChannel channel2 = new QueueChannel();
QueueChannel channel1 = new QueueChannel();
QueueChannel channel2 = new QueueChannel();
channel1.send(new StringMessage("test1"));
channel1.send(new StringMessage("test2"));
channel2.send(new StringMessage("test1"));
@@ -115,8 +115,8 @@ public class ChannelPurgerTests {
@Test
public void testMultipleChannelsWithSelector() {
MessageChannel channel1 = new QueueChannel();
MessageChannel channel2 = new QueueChannel();
QueueChannel channel1 = new QueueChannel();
QueueChannel channel2 = new QueueChannel();
channel1.send(new StringMessage("test1"));
channel1.send(new StringMessage("test2"));
channel1.send(new StringMessage("test3"));
@@ -142,8 +142,8 @@ public class ChannelPurgerTests {
@Test
public void testPurgeNoneWithSelectorAndMultipleChannels() {
MessageChannel channel1 = new QueueChannel();
MessageChannel channel2 = new QueueChannel();
QueueChannel channel1 = new QueueChannel();
QueueChannel channel2 = new QueueChannel();
channel1.send(new StringMessage("test1"));
channel1.send(new StringMessage("test2"));
channel2.send(new StringMessage("test1"));
@@ -163,13 +163,13 @@ public class ChannelPurgerTests {
@Test(expected=IllegalArgumentException.class)
public void testNullChannel() {
MessageChannel channel = null;
QueueChannel channel = null;
new ChannelPurger(channel);
}
@Test(expected=IllegalArgumentException.class)
public void testEmptyChannelArray() {
MessageChannel[] channels = new MessageChannel[0];
QueueChannel[] channels = new QueueChannel[0];
new ChannelPurger(channels);
}

View File

@@ -31,6 +31,7 @@ import org.springframework.beans.FatalBeanException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.TestChannelInterceptor;
import org.springframework.integration.dispatcher.PublishSubscribeChannel;
@@ -140,7 +141,7 @@ public class ChannelParserTests {
public void testChannelInteceptors() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"channelInterceptorParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("channel");
PollableChannel channel = (PollableChannel) context.getBean("channel");
TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor");
assertEquals(0, interceptor.getSendCount());
channel.send(new StringMessage("test"));
@@ -154,7 +155,7 @@ public class ChannelParserTests {
public void testPriorityChannelWithDefaultComparator() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("priorityChannelWithDefaultComparator");
PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithDefaultComparator");
Message<String> lowPriorityMessage = MessageBuilder.fromPayload("low")
.setPriority(MessagePriority.LOW).build();
Message<String> midPriorityMessage = MessageBuilder.fromPayload("mid")
@@ -176,7 +177,7 @@ public class ChannelParserTests {
public void testPriorityChannelWithCustomComparator() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("priorityChannelWithCustomComparator");
PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithCustomComparator");
channel.send(new StringMessage("C"));
channel.send(new StringMessage("A"));
channel.send(new StringMessage("D"));
@@ -195,7 +196,7 @@ public class ChannelParserTests {
public void testPriorityChannelWithIntegerDatatypeEnforced() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"priorityChannelParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("integerOnlyPriorityChannel");
PollableChannel channel = (PollableChannel) context.getBean("integerOnlyPriorityChannel");
channel.send(new GenericMessage<Integer>(3));
channel.send(new GenericMessage<Integer>(2));
channel.send(new GenericMessage<Integer>(1));

View File

@@ -17,13 +17,13 @@
package org.springframework.integration.channel.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class TestSource implements MessageSource<String> {
public class TestSource implements PollableSource<String> {
private final String text;

View File

@@ -18,14 +18,14 @@ package org.springframework.integration.channel.factory;
import java.util.List;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.AbstractPollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.selector.MessageSelector;
/**
* @author Marius Bogoevici
*/
public class StubChannel extends AbstractMessageChannel {
public class StubChannel extends AbstractPollableChannel {
@Override
protected Message<?> doReceive(long timeout) {

View File

@@ -29,6 +29,7 @@ import org.springframework.beans.factory.BeanCreationException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
@@ -114,7 +115,7 @@ public class AggregatorParserTests {
for (Message<?> message : outboundMessages) {
addingAggregator.handle(message);
}
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> response = outputChannel.receive();
Assert.assertEquals(6l, response.getPayload());
}
@@ -144,7 +145,7 @@ public class AggregatorParserTests {
aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null));
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> reply = outputChannel.receive(0);
Assert.assertNull(reply);
aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null));

View File

@@ -27,6 +27,7 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -71,7 +72,7 @@ public class EndpointParserTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithHandlerChainElement.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("testChannel");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel");
channel.send(new StringMessage("test"));
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
@@ -83,7 +84,7 @@ public class EndpointParserTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithSelector.xml", this.getClass());
MessageTarget endpoint = (MessageTarget) context.getBean("endpoint");
MessageChannel replyChannel = new QueueChannel();
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
assertTrue(endpoint.send(message));

View File

@@ -27,6 +27,7 @@ import org.springframework.beans.DirectFieldAccessor;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.router.ResequencingMessageHandler;
@@ -49,7 +50,7 @@ public class ResequencerParserTests {
public void testResequencing() {
ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context
.getBean("defaultResequencer");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
List<Message<?>> outboundMessages = new ArrayList<Message<?>>();
outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel));
outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel));

View File

@@ -17,13 +17,14 @@
package org.springframework.integration.config;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class TestSource implements MessageSource {
@SuppressWarnings("unchecked")
public class TestSource implements PollableSource {
public Message<?> receive() {
return new StringMessage("test");

View File

@@ -52,6 +52,7 @@ import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.EndpointInterceptor;
import org.springframework.integration.endpoint.HandlerEndpoint;
@@ -101,7 +102,7 @@ public class MessagingAnnotationPostProcessorTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
"handlerAnnotationPostProcessorTests.xml", this.getClass());
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
inputChannel.send(new StringMessage("foo"));
Message<?> reply = outputChannel.receive(1000);
assertEquals("hello foo", reply.getPayload());
@@ -112,7 +113,7 @@ public class MessagingAnnotationPostProcessorTests {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("simpleAnnotatedEndpointTests.xml", this.getClass());
context.start();
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -126,7 +127,7 @@ public class MessagingAnnotationPostProcessorTests {
context.start();
ChannelRegistry channelRegistry = (ChannelRegistry) context.getBean("bus");
MessageChannel inputChannel = channelRegistry.lookupChannel("inputChannel");
MessageChannel outputChannel = channelRegistry.lookupChannel("outputChannel");
PollableChannel outputChannel = (PollableChannel) channelRegistry.lookupChannel("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -138,7 +139,7 @@ public class MessagingAnnotationPostProcessorTests {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("messageParameterAnnotatedEndpointTests.xml", this.getClass());
context.start();
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -150,7 +151,7 @@ public class MessagingAnnotationPostProcessorTests {
AbstractApplicationContext context = new ClassPathXmlApplicationContext("typeConvertingEndpointTests.xml", this.getClass());
context.start();
MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel");
MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel");
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
inputChannel.send(new StringMessage("123"));
Message<?> message = outputChannel.receive(1000);
assertEquals(246, message.getPayload());
@@ -226,7 +227,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.postProcessAfterInitialization(proxy, "proxy");
messageBus.start();
MessageChannel inputChannel = messageBus.lookupChannel("inputChannel");
MessageChannel outputChannel = messageBus.lookupChannel("outputChannel");
PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -241,7 +242,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
messageBus.start();
MessageChannel inputChannel = messageBus.lookupChannel("inputChannel");
MessageChannel outputChannel = messageBus.lookupChannel("outputChannel");
PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -258,7 +259,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.postProcessAfterInitialization(proxy, "proxy");
messageBus.start();
MessageChannel inputChannel = messageBus.lookupChannel("inputChannel");
MessageChannel outputChannel = messageBus.lookupChannel("outputChannel");
PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel");
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
@@ -268,7 +269,7 @@ public class MessagingAnnotationPostProcessorTests {
public void testMessageEndpointAnnotationInheritedFromInterface() {
MessageBus messageBus = new DefaultMessageBus();
MessageChannel inputChannel = new QueueChannel();
MessageChannel outputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
messageBus.registerChannel("inputChannel", inputChannel);
messageBus.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);
@@ -289,7 +290,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
messageBus.start();
MessageChannel inputChannel = messageBus.lookupChannel("inputChannel");
MessageChannel outputChannel = messageBus.lookupChannel("outputChannel");
PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel");
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
@@ -299,7 +300,7 @@ public class MessagingAnnotationPostProcessorTests {
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
MessageBus messageBus = new DefaultMessageBus();
MessageChannel inputChannel = new QueueChannel();
MessageChannel outputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
messageBus.registerChannel("inputChannel", inputChannel);
messageBus.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus);

View File

@@ -17,38 +17,22 @@
package org.springframework.integration.dispatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
/**
* @author Mark Fisher
*/
public class DirectChannelParserTests {
@Test
public void testReceivesNullFromChannelWithoutSource() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"directChannelParserTests.xml", DirectChannelParserTests.class);
DirectChannel channel = (DirectChannel) context.getBean("channelWithoutSource");
assertNull(channel.receive());
}
@Test
public void testReceivesMessageFromChannelWithSource() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"directChannelParserTests.xml", DirectChannelParserTests.class);
DirectChannel channel = (DirectChannel) context.getBean("channelWithSource");
assertFalse(channel.send(new StringMessage("test")));
Message<?> reply = channel.receive();
assertNotNull(reply);
assertEquals("foo", reply.getPayload());
Object channel = context.getBean("channel");
assertEquals(DirectChannel.class, channel.getClass());
}
}

View File

@@ -17,19 +17,14 @@
package org.springframework.integration.dispatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.StringMessage;
@@ -38,9 +33,6 @@ import org.springframework.integration.message.StringMessage;
*/
public class DirectChannelTests {
private static final String HANDLER_THREAD = "handler-thread";
@Test
public void testSend() {
DirectChannel channel = new DirectChannel();
@@ -67,63 +59,6 @@ public class DirectChannelTests {
assertEquals("test-thread", target.threadName);
}
@Test
public void testReceive() {
DirectChannel channel = new DirectChannel(new MessageSource<String>() {
public Message<String> receive() {
return new StringMessage("foo");
}
});
Message<?> message = channel.receive();
assertNotNull(message);
assertNotNull(message.getPayload());
assertEquals(String.class, message.getPayload().getClass());
assertEquals("foo", message.getPayload());
}
@Test
public void testReceiveWithMessageResult() {
DirectChannel channel = new DirectChannel(new MessageReturningTestSource("foo"));
Message<?> message = channel.receive();
assertNotNull(message);
assertNotNull(message.getPayload());
assertEquals(String.class, message.getPayload().getClass());
assertEquals("foo", message.getPayload());
String handlerThreadName = message.getHeaders().get(HANDLER_THREAD, String.class);
assertEquals(Thread.currentThread().getName(), handlerThreadName);
}
@Test
public void testReceiveInSeparateThread() throws InterruptedException {
final DirectChannel channel = new DirectChannel(new MessageReturningTestSource("foo"));
final SynchronousQueue<Message<?>> messageHolder = new SynchronousQueue<Message<?>>();
new Thread(new Runnable() {
public void run() {
Message<?> message = channel.receive();
assertNotNull(message);
try {
messageHolder.put(message);
}
catch (InterruptedException e) {
// will fail after timeout below
}
}
}, "test-thread").start();
Message<?> message = messageHolder.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(message);
assertNotNull(message.getPayload());
assertEquals(String.class, message.getPayload().getClass());
assertEquals("foo", message.getPayload());
String handlerThreadName = message.getHeaders().get(HANDLER_THREAD, String.class);
assertEquals("test-thread", handlerThreadName);
}
@Test
public void testReceiveWithNoSource() {
DirectChannel channel = new DirectChannel();
assertNull(channel.receive());
}
private static class ThreadNameExtractingTestTarget implements MessageTarget {
@@ -149,20 +84,4 @@ public class DirectChannelTests {
}
}
private static class MessageReturningTestSource implements MessageSource<String> {
private final String messageText;
MessageReturningTestSource(String messageText) {
this.messageText = messageText;
}
public Message<String> receive() {
return MessageBuilder.fromPayload(messageText)
.setHeader(HANDLER_THREAD, Thread.currentThread().getName()).build();
}
}
}

View File

@@ -7,12 +7,6 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<direct-channel id="channelWithoutSource"/>
<direct-channel id="channel"/>
<direct-channel id="channelWithSource" source="testSource"/>
<beans:bean id="testSource" class="org.springframework.integration.channel.config.TestSource">
<beans:constructor-arg value="foo"/>
</beans:bean>
</beans:beans>

View File

@@ -30,7 +30,6 @@ import org.junit.Test;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.DefaultChannelRegistry;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.TestHandlers;
@@ -48,7 +47,7 @@ public class HandlerEndpointTests {
@Test
public void testDefaultReplyChannel() throws Exception {
MessageChannel replyChannel = new QueueChannel();
QueueChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
MessageHandler handler = new MessageHandler() {
@@ -67,7 +66,7 @@ public class HandlerEndpointTests {
@Test
public void testExplicitReplyChannel() throws Exception {
final MessageChannel replyChannel = new QueueChannel();
final QueueChannel replyChannel = new QueueChannel();
MessageHandler handler = new MessageHandler() {
public Message<?> handle(Message<?> message) {
return new StringMessage("hello " + message.getPayload());
@@ -84,7 +83,7 @@ public class HandlerEndpointTests {
@Test
public void testExplicitReplyChannelName() throws Exception {
final MessageChannel replyChannel = new QueueChannel();
final QueueChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
MessageHandler handler = new MessageHandler() {
@@ -104,8 +103,8 @@ public class HandlerEndpointTests {
@Test
public void testDynamicReplyChannel() throws Exception {
final MessageChannel replyChannel1 = new QueueChannel();
final MessageChannel replyChannel2 = new QueueChannel();
final QueueChannel replyChannel1 = new QueueChannel();
final QueueChannel replyChannel2 = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel2", replyChannel2);
MessageHandler handler = new MessageHandler() {
@@ -135,7 +134,7 @@ public class HandlerEndpointTests {
@Test
public void testHandlerReturnsNull() throws InterruptedException {
MessageChannel replyChannel = new QueueChannel();
QueueChannel replyChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("replyChannel", replyChannel);
final CountDownLatch latch = new CountDownLatch(1);

View File

@@ -23,6 +23,7 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
@@ -37,7 +38,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel1 = (MessageChannel) context.getBean("channel1WithOverride");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel");
context.start();
Message<String> message = MessageBuilder.fromPayload("*")
.setReturnAddress("replyChannel").build();
@@ -52,7 +53,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel1 = (MessageChannel) context.getBean("channel1");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel");
context.start();
Message<String> message = MessageBuilder.fromPayload("*")
.setReturnAddress("replyChannel").build();
@@ -67,7 +68,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel4 = (MessageChannel) context.getBean("channel4");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel");
context.start();
StringMessage message = new StringMessage("*");
channel4.send(message);
@@ -81,7 +82,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel3 = (MessageChannel) context.getBean("channel3");
MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel");
PollableChannel errorChannel = (PollableChannel) context.getBean("customErrorChannel");
context.start();
StringMessage message = new StringMessage("*");
channel3.send(message);
@@ -94,7 +95,7 @@ public class ReturnAddressTests {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"returnAddressTests.xml", this.getClass());
MessageChannel channel3 = (MessageChannel) context.getBean("channel3WithOverride");
MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel");
PollableChannel errorChannel = (PollableChannel) context.getBean("customErrorChannel");
context.start();
StringMessage message = new StringMessage("*");
channel3.send(message);

View File

@@ -26,7 +26,7 @@ import org.junit.Test;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.PollableSource;
/**
* @author Mark Fisher
@@ -47,7 +47,7 @@ public class SourceEndpointTests {
}
private static class TestSource implements MessageSource<String> {
private static class TestSource implements PollableSource<String> {
private String message;

View File

@@ -24,6 +24,7 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -44,7 +45,7 @@ public class TransactionInterceptorTests {
"transactionInterceptorTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
MessageChannel input = (MessageChannel) context.getBean("goodInput");
MessageChannel output = (MessageChannel) context.getBean("output");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
assertEquals(0, txManager.getRollbackCount());
input.send(new StringMessage("test"));
@@ -61,7 +62,7 @@ public class TransactionInterceptorTests {
"transactionInterceptorTests.xml", this.getClass());
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
MessageChannel input = (MessageChannel) context.getBean("badInput");
MessageChannel output = (MessageChannel) context.getBean("output");
PollableChannel output = (PollableChannel) context.getBean("output");
assertEquals(0, txManager.getCommitCount());
assertEquals(0, txManager.getRollbackCount());
input.send(new StringMessage("test"));

View File

@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
@@ -41,7 +41,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testRequestReplyWithAnonymousChannel() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
QueueChannel requestChannel = new QueueChannel();
startResponder(requestChannel);
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
proxyFactory.setRequestChannel(requestChannel);
@@ -54,7 +54,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testOneWay() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
final QueueChannel requestChannel = new QueueChannel();
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
proxyFactory.setServiceInterface(TestService.class);
proxyFactory.setRequestChannel(requestChannel);
@@ -68,7 +68,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testSolicitResponse() throws Exception {
MessageChannel replyChannel = new QueueChannel();
QueueChannel replyChannel = new QueueChannel();
replyChannel.send(new StringMessage("foo"));
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
proxyFactory.setServiceInterface(TestService.class);
@@ -82,7 +82,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testRequestReplyWithTypeConversion() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
final QueueChannel requestChannel = new QueueChannel();
new Thread(new Runnable() {
public void run() {
Message<?> input = requestChannel.receive();
@@ -155,7 +155,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testMessageAsMethodArgument() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
QueueChannel requestChannel = new QueueChannel();
startResponder(requestChannel);
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean();
proxyFactory.setServiceInterface(TestService.class);
@@ -168,7 +168,7 @@ public class GatewayProxyFactoryBeanTests {
@Test
public void testMessageAsReturnValue() throws Exception {
final MessageChannel requestChannel = new QueueChannel();
final QueueChannel requestChannel = new QueueChannel();
new Thread(new Runnable() {
public void run() {
Message<?> input = requestChannel.receive();
@@ -212,7 +212,7 @@ public class GatewayProxyFactoryBeanTests {
}
private static void startResponder(final MessageChannel requestChannel) {
private static void startResponder(final PollableChannel requestChannel) {
new Thread(new Runnable() {
public void run() {
Message<?> input = requestChannel.receive();

View File

@@ -32,6 +32,7 @@ import org.junit.Test;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageHeaders;
@@ -47,7 +48,7 @@ public class SimpleMessagingGatewayTests {
private MessageChannel requestChannel = createMock(MessageChannel.class);
private MessageChannel replyChannel = createMock(MessageChannel.class);
private PollableChannel replyChannel = createMock(PollableChannel.class);
private Message<?> messageMock = createMock(Message.class);

View File

@@ -25,6 +25,7 @@ import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.gateway.TestService;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
@@ -40,7 +41,7 @@ public class GatewayParserTests {
ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass());
TestService service = (TestService) context.getBean("oneWay");
service.oneWay("foo");
MessageChannel channel = (MessageChannel) context.getBean("requestChannel");
PollableChannel channel = (PollableChannel) context.getBean("requestChannel");
Message<?> result = channel.receive(1000);
assertEquals("foo", result.getPayload());
}
@@ -48,7 +49,7 @@ public class GatewayParserTests {
@Test
public void testSolicitResponse() {
ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass());
MessageChannel channel = (MessageChannel) context.getBean("replyChannel");
PollableChannel channel = (PollableChannel) context.getBean("replyChannel");
channel.send(new StringMessage("foo"));
TestService service = (TestService) context.getBean("solicitResponse");
String result = service.solicitResponse();
@@ -58,7 +59,7 @@ public class GatewayParserTests {
@Test
public void testRequestReply() {
ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass());
MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel");
PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReply");
@@ -69,7 +70,7 @@ public class GatewayParserTests {
@Test
public void testRequestReplyWithMessageMapper() {
ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass());
MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel");
PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReplyWithMessageMapper");
@@ -80,7 +81,7 @@ public class GatewayParserTests {
@Test
public void testRequestReplyWithMessageCreator() {
ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass());
MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel");
PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel");
MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel");
this.startResponder(requestChannel, replyChannel);
TestService service = (TestService) context.getBean("requestReplyWithMessageCreator");
@@ -89,7 +90,7 @@ public class GatewayParserTests {
}
private void startResponder(final MessageChannel requestChannel, final MessageChannel replyChannel) {
private void startResponder(final PollableChannel requestChannel, final MessageChannel replyChannel) {
Executors.newSingleThreadExecutor().execute(new Runnable() {
public void run() {
Message<?> request = requestChannel.receive();

View File

@@ -23,7 +23,6 @@ import org.junit.Test;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.DefaultChannelRegistry;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
@@ -117,7 +116,7 @@ public class CorrelationIdTests {
adapter.setObject(new TestBean());
adapter.setMethodName("upperCase");
adapter.afterPropertiesSet();
MessageChannel testChannel = new QueueChannel();
QueueChannel testChannel = new QueueChannel();
ChannelRegistry channelRegistry = new DefaultChannelRegistry();
channelRegistry.registerChannel("testChannel", testChannel);
SplitterMessageHandlerAdapter splitter = new SplitterMessageHandlerAdapter(

View File

@@ -38,15 +38,15 @@ import org.springframework.integration.message.StringMessage;
*/
public class RootCauseErrorMessageRouterTests {
private MessageChannel illegalArgumentChannel = new QueueChannel();
private QueueChannel illegalArgumentChannel = new QueueChannel();
private MessageChannel runtimeExceptionChannel = new QueueChannel();
private QueueChannel runtimeExceptionChannel = new QueueChannel();
private MessageChannel messageHandlingExceptionChannel = new QueueChannel();
private QueueChannel messageHandlingExceptionChannel = new QueueChannel();
private MessageChannel messageDeliveryExceptionChannel = new QueueChannel();
private QueueChannel messageDeliveryExceptionChannel = new QueueChannel();
private MessageChannel defaultChannel = new QueueChannel();
private QueueChannel defaultChannel = new QueueChannel();
@Test

View File

@@ -23,6 +23,7 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -37,8 +38,8 @@ public class RouterParserTests {
"routerParserTests.xml", this.getClass());
context.start();
MessageChannel input = (MessageChannel) context.getBean("input");
MessageChannel output1 = (MessageChannel) context.getBean("output1");
MessageChannel output2 = (MessageChannel) context.getBean("output2");
PollableChannel output1 = (PollableChannel) context.getBean("output1");
PollableChannel output2 = (PollableChannel) context.getBean("output2");
input.send(new StringMessage("1"));
Message<?> result1 = output1.receive(1000);
assertEquals("1", result1.getPayload());

View File

@@ -28,6 +28,7 @@ import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -44,7 +45,7 @@ public class SplitterAggregatorTests {
ApplicationContext context = new ClassPathXmlApplicationContext(
"splitterAggregatorTests.xml", this.getClass());
MessageChannel inputChannel = (MessageChannel) context.getBean("numbers");
MessageChannel outputChannel = (MessageChannel) context.getBean("results");
PollableChannel outputChannel = (PollableChannel) context.getBean("results");
inputChannel.send(new GenericMessage<Numbers>(this.nextTen()));
Message<?> result1 = outputChannel.receive(1000);
assertNotNull(result1);

View File

@@ -23,6 +23,7 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.StringMessage;
@@ -37,7 +38,7 @@ public class SplitterParserTests {
"splitterParserTests.xml", this.getClass());
context.start();
MessageChannel channel1 = (MessageChannel) context.getBean("channel1");
MessageChannel channel2 = (MessageChannel) context.getBean("channel2");
PollableChannel channel2 = (PollableChannel) context.getBean("channel2");
channel1.send(new StringMessage("this.is.a.test"));
Message<?> result1 = channel2.receive(1000);
assertEquals("this", result1.getPayload());

View File

@@ -21,7 +21,6 @@ import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.Message;
@@ -32,7 +31,7 @@ import org.springframework.integration.message.StringMessage;
*/
public class MessageTransformingChannelInterceptorTests {
private AbstractMessageChannel channel;
private QueueChannel channel;
private StringMessage message;