diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/AbstractDirectorySource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/AbstractDirectorySource.java index cddf68fbd9..69c2d978be 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/AbstractDirectorySource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/AbstractDirectorySource.java @@ -16,7 +16,6 @@ package org.springframework.integration.adapter.file; -import java.io.File; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -24,24 +23,21 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageCreator; import org.springframework.integration.message.MessageDeliveryAware; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.util.Assert; -import org.springframework.util.StringUtils; /** - * Base class for implementing a Source that creates messages from files in a - * directory, either local or remote. + * Base class for implementing a PollableSource that creates messages + * from files in a directory, either local or remote. * * @author Marius Bogoevici - * @author iwein + * @author Iwein Fuld */ -public abstract class AbstractDirectorySource implements MessageSource, MessageDeliveryAware { +public abstract class AbstractDirectorySource implements PollableSource, MessageDeliveryAware { public final static String FILE_INFO_PROPERTY = "file.info"; diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/FileSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/FileSource.java index 5f5e6ab504..a197f04b24 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/FileSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/file/FileSource.java @@ -24,6 +24,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.core.io.Resource; import org.springframework.integration.ConfigurationException; import org.springframework.integration.message.Message; @@ -31,7 +32,6 @@ import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageCreator; import org.springframework.integration.message.MessageDeliveryAware; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageSource; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -41,7 +41,7 @@ import org.springframework.util.StringUtils; * @author Mark Fisher * @author Marius Bogoevici */ -public class FileSource extends AbstractDirectorySource implements MessageSource, MessageDeliveryAware { +public class FileSource extends AbstractDirectorySource implements MessageDeliveryAware { private final Log logger = LogFactory.getLog(this.getClass()); diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/jms/JmsSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/jms/JmsSource.java index 75cf64f236..e1e86aeb9c 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/jms/JmsSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/jms/JmsSource.java @@ -21,7 +21,7 @@ import javax.jms.Destination; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.jms.core.JmsTemplate; /** @@ -32,7 +32,7 @@ import org.springframework.jms.core.JmsTemplate; * * @author Mark Fisher */ -public class JmsSource extends AbstractJmsTemplateBasedAdapter implements MessageSource { +public class JmsSource extends AbstractJmsTemplateBasedAdapter implements PollableSource { public JmsSource(JmsTemplate jmsTemplate) { super(jmsTemplate); diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/PollingMailSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/PollingMailSource.java index ab0642c70c..c82566175b 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/PollingMailSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/mail/PollingMailSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,7 @@ import javax.mail.internet.MimeMessage; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.beans.factory.DisposableBean; import org.springframework.context.Lifecycle; import org.springframework.integration.adapter.mail.monitor.DefaultLocalMailMessageStore; @@ -34,20 +35,19 @@ import org.springframework.integration.adapter.mail.monitor.LocalMailMessageStor import org.springframework.integration.adapter.mail.monitor.MailTransportUtils; import org.springframework.integration.adapter.mail.monitor.MonitoringStrategy; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.util.Assert; /** * {@link MessageSource} implementation which delegates to a - * {@link MonitoringStrategy} to poll a mailbox Each poll of the mailbox may + * {@link MonitoringStrategy} to poll a mailbox. Each poll of the mailbox may * return more than one message which will then be stored locally using the * provided {@link LocalMailMessageStore} - * @author Jonas Partner * + * @author Jonas Partner */ - @SuppressWarnings("unchecked") -public class PollingMailSource implements MessageSource, DisposableBean, Lifecycle { +public class PollingMailSource implements PollableSource, DisposableBean, Lifecycle { private final Log logger = LogFactory.getLog(this.getClass()); diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamSource.java index 3015eea971..6010667bfb 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamSource.java @@ -23,14 +23,14 @@ import java.io.InputStream; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; /** * A pollable source for receiving bytes from an {@link InputStream}. * * @author Mark Fisher */ -public class ByteStreamSource implements MessageSource { +public class ByteStreamSource implements PollableSource { private BufferedInputStream stream; diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamSource.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamSource.java index 0d5da4d363..a87dd1c6cf 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamSource.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamSource.java @@ -24,7 +24,7 @@ import java.io.UnsupportedEncodingException; import org.springframework.integration.ConfigurationException; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.message.StringMessage; import org.springframework.util.Assert; @@ -33,7 +33,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class CharacterStreamSource implements MessageSource { +public class CharacterStreamSource implements PollableSource { private final BufferedReader reader; diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/event/ApplicationEventSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/event/ApplicationEventSourceTests.java index 129d0039e6..00a9a22735 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/event/ApplicationEventSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/event/ApplicationEventSourceTests.java @@ -31,7 +31,7 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.ContextStartedEvent; import org.springframework.context.event.ContextStoppedEvent; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -42,7 +42,7 @@ public class ApplicationEventSourceTests { @Test public void testAnyApplicationEventSentByDefault() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); ApplicationEventSource adapter = new ApplicationEventSource(channel); Message message1 = channel.receive(0); assertNull(message1); @@ -58,7 +58,7 @@ public class ApplicationEventSourceTests { @Test public void testOnlyConfiguredEventTypesAreSent() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); ApplicationEventSource adapter = new ApplicationEventSource(channel); List> eventTypes = new ArrayList>(); eventTypes.add(TestApplicationEvent1.class); @@ -77,7 +77,7 @@ public class ApplicationEventSourceTests { @Test public void testApplicationContextEvents() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationEventSourceTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("channel"); + PollableChannel channel = (PollableChannel) context.getBean("channel"); Message refreshedEventMessage = channel.receive(0); assertNotNull(refreshedEventMessage); assertEquals(ContextRefreshedEvent.class, refreshedEventMessage.getPayload().getClass()); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/httpinvoker/HttpInvokerGatewayTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/httpinvoker/HttpInvokerGatewayTests.java index ca058de516..8f22e745d2 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/httpinvoker/HttpInvokerGatewayTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/httpinvoker/HttpInvokerGatewayTests.java @@ -29,7 +29,6 @@ import java.util.concurrent.Executors; import org.junit.Test; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; @@ -46,7 +45,7 @@ public class HttpInvokerGatewayTests { @Test public void testRequestOnly() throws Exception { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); HttpInvokerGateway gateway = new HttpInvokerGateway(channel); gateway.setExpectReply(false); gateway.afterPropertiesSet(); @@ -61,7 +60,7 @@ public class HttpInvokerGatewayTests { @Test public void testRequestReply() throws Exception { - final MessageChannel channel = new QueueChannel(); + final QueueChannel channel = new QueueChannel(); Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { Message message = channel.receive(); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsGatewayParserTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsGatewayParserTests.java index c51742135c..f84c6f8243 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsGatewayParserTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsGatewayParserTests.java @@ -27,7 +27,6 @@ import org.springframework.beans.factory.BeanCreationException; import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.adapter.jms.JmsGateway; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.jms.connection.JmsTransactionManager; @@ -42,7 +41,7 @@ public class JmsGatewayParserTests { public void testGatewayWithConnectionFactoryAndDestination() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jmsGatewayWithConnectionFactoryAndDestination.xml", this.getClass()); - MessageChannel channel = new QueueChannel(1); + QueueChannel channel = new QueueChannel(1); JmsGateway gateway = (JmsGateway) context.getBean("jmsGateway"); gateway.setRequestChannel(channel); context.start(); @@ -56,7 +55,7 @@ public class JmsGatewayParserTests { public void testGatewayWithConnectionFactoryAndDestinationName() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jmsGatewayWithConnectionFactoryAndDestinationName.xml", this.getClass()); - MessageChannel channel = new QueueChannel(1); + QueueChannel channel = new QueueChannel(1); JmsGateway gateway = (JmsGateway) context.getBean("jmsGateway"); gateway.setRequestChannel(channel); context.start(); @@ -70,7 +69,7 @@ public class JmsGatewayParserTests { public void testGatewayWithMessageConverter() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jmsGatewayWithMessageConverter.xml", this.getClass()); - MessageChannel channel = new QueueChannel(1); + QueueChannel channel = new QueueChannel(1); JmsGateway gateway = (JmsGateway) context.getBean("jmsGateway"); gateway.setRequestChannel(channel); context.start(); @@ -133,7 +132,7 @@ public class JmsGatewayParserTests { public void testGatewayWithDefaultConnectionFactory() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jmsGatewayWithDefaultConnectionFactory.xml", this.getClass()); - MessageChannel channel = new QueueChannel(1); + QueueChannel channel = new QueueChannel(1); JmsGateway gateway = (JmsGateway) context.getBean("jmsGateway"); gateway.setRequestChannel(channel); context.start(); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsSourceParserTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsSourceParserTests.java index bb83a207fa..a929b441cb 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsSourceParserTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/jms/config/JmsSourceParserTests.java @@ -26,7 +26,7 @@ import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.adapter.jms.JmsSource; -import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; /** @@ -131,7 +131,7 @@ public class JmsSourceParserTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jmsSourceEndpoint.xml", this.getClass()); context.start(); - MessageChannel channel = (MessageChannel) context.getBean("channel"); + PollableChannel channel = (PollableChannel) context.getBean("channel"); Message message = channel.receive(3000); assertNotNull("message should not be null", message); assertEquals("polling-test", message.getPayload()); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java index e380bf921a..6aaa2bc0e0 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertNull; import java.io.ByteArrayInputStream; import org.junit.Test; -import org.springframework.integration.channel.MessageChannel; + import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.SourceEndpoint; import org.springframework.integration.endpoint.TriggerMessage; @@ -38,7 +38,7 @@ public class ByteStreamSourceTests { public void testEndOfStream() { byte[] bytes = new byte[] {1,2,3}; ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); ByteStreamSource source = new ByteStreamSource(stream); SourceEndpoint endpoint = new SourceEndpoint(source); endpoint.setTarget(channel); @@ -61,7 +61,7 @@ public class ByteStreamSourceTests { public void testByteArrayIsTruncated() { byte[] bytes = new byte[] {0,1,2,3,4,5}; ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); ByteStreamSource source = new ByteStreamSource(stream); source.setBytesPerMessage(4); PollingSchedule schedule = new PollingSchedule(1000); @@ -83,7 +83,7 @@ public class ByteStreamSourceTests { public void testByteArrayIsNotTruncated() { byte[] bytes = new byte[] {0,1,2,3,4,5}; ByteArrayInputStream stream = new ByteArrayInputStream(bytes); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); ByteStreamSource source = new ByteStreamSource(stream); source.setBytesPerMessage(4); source.setShouldTruncate(false); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java index 0323ffefab..89e7f39597 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java @@ -24,7 +24,6 @@ import java.io.IOException; import org.junit.Before; import org.junit.Test; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dispatcher.BroadcastingDispatcher; import org.springframework.integration.dispatcher.PollingDispatcher; @@ -37,7 +36,7 @@ import org.springframework.integration.scheduling.PollingSchedule; */ public class ByteStreamTargetTests { - private MessageChannel channel; + private QueueChannel channel; private PollingDispatcher dispatcher; diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java index fa387e914e..1e4bbec690 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java @@ -22,7 +22,7 @@ import static org.junit.Assert.assertNull; import java.io.StringReader; import org.junit.Test; -import org.springframework.integration.channel.MessageChannel; + import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.SourceEndpoint; import org.springframework.integration.endpoint.TriggerMessage; @@ -37,7 +37,7 @@ public class CharacterStreamSourceTests { @Test public void testEndOfStream() { StringReader reader = new StringReader("test"); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); CharacterStreamSource source = new CharacterStreamSource(reader); PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java index bdb65ef480..348d029298 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java @@ -23,7 +23,6 @@ import java.io.StringWriter; import org.junit.Before; import org.junit.Test; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dispatcher.BroadcastingDispatcher; import org.springframework.integration.dispatcher.PollingDispatcher; @@ -36,7 +35,7 @@ import org.springframework.integration.scheduling.PollingSchedule; */ public class CharacterStreamTargetTests { - private MessageChannel channel; + private QueueChannel channel; private PollingDispatcher dispatcher; diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/HelloWorldDemo.java b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/HelloWorldDemo.java index 0da4d349c5..1f56d3157d 100644 --- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/HelloWorldDemo.java +++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/helloworld/HelloWorldDemo.java @@ -20,6 +20,7 @@ import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.config.MessageBusParser; import org.springframework.integration.message.StringMessage; @@ -34,7 +35,7 @@ public class HelloWorldDemo { AbstractApplicationContext context = new ClassPathXmlApplicationContext("helloWorldDemo.xml", HelloWorldDemo.class); ChannelRegistry channelRegistry = (ChannelRegistry) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); MessageChannel inputChannel = channelRegistry.lookupChannel("inputChannel"); - MessageChannel outputChannel = channelRegistry.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) channelRegistry.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("World")); System.out.println(outputChannel.receive().getPayload()); } diff --git a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java index e093ca7205..f6cdedbd57 100644 --- a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java +++ b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/ChannelInterceptorRegisteringBeanPostProcessorTests.java @@ -25,7 +25,7 @@ import org.easymock.EasyMock; import org.junit.Before; import org.junit.Test; -import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.integration.channel.AbstractPollableChannel; import org.springframework.integration.channel.ChannelInterceptor; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.interceptor.ChannelInterceptorAdapter; @@ -44,7 +44,6 @@ public class ChannelInterceptorRegisteringBeanPostProcessorTests { public void setUp() { matchAll = new ArrayList(); matchAll.add(".*"); - } @@ -81,7 +80,7 @@ public class ChannelInterceptorRegisteringBeanPostProcessorTests { } - private static class TestChannel extends AbstractMessageChannel { + private static class TestChannel extends AbstractPollableChannel { ChannelInterceptor channelInterceptor; diff --git a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java index e87d62b9a8..39db3112db 100644 --- a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java +++ b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecuredChannelsParserTests.java @@ -23,7 +23,8 @@ import java.util.List; import org.junit.Before; import org.junit.Test; -import org.springframework.integration.channel.AbstractMessageChannel; + +import org.springframework.integration.channel.AbstractPollableChannel; import org.springframework.integration.channel.ChannelInterceptor; import org.springframework.integration.message.Message; import org.springframework.integration.message.selector.MessageSelector; @@ -111,7 +112,7 @@ public class SecuredChannelsParserTests extends AbstractJUnit4SpringContextTests } - static class TestMessageChannel extends AbstractMessageChannel { + static class TestMessageChannel extends AbstractPollableChannel { List interceptors = new ArrayList(); diff --git a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecurityPropagatingChannelsParserTests.java b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecurityPropagatingChannelsParserTests.java index 33b253bd4a..c415cbc8c3 100644 --- a/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecurityPropagatingChannelsParserTests.java +++ b/org.springframework.integration.security/src/test/java/org/springframework/integration/security/config/SecurityPropagatingChannelsParserTests.java @@ -20,9 +20,10 @@ import static org.junit.Assert.*; import org.junit.After; import org.junit.Test; + import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.StringMessage; import org.springframework.security.context.SecurityContext; @@ -48,7 +49,7 @@ public class SecurityPropagatingChannelsParserTests { @Test public void testPropagationByDefault() { loadApplicationContext(this.getClass().getSimpleName() + "-propagateByDefaultContext.xml"); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); applicationContext.getAutowireCapableBeanFactory().applyBeanPostProcessorsAfterInitialization(channel, "Does not matter"); assertTrue("security context did not propagate by setting message bus level default", @@ -67,13 +68,13 @@ public class SecurityPropagatingChannelsParserTests { @Test public void testNoPropagationWithExcludedChannel() { loadApplicationContext(this.getClass().getSimpleName() + "-noPropagationByDefaultContext.xml"); - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); applicationContext.getAutowireCapableBeanFactory().applyBeanPostProcessorsAfterInitialization(channel, "adminSpecial"); assertFalse("security context propagated when channel excluded", channelPropagatesSecurityContext(channel)); } - private boolean channelPropagatesSecurityContext(MessageChannel channel) { + private boolean channelPropagatesSecurityContext(PollableChannel channel) { login("bob", "bobspassword"); channel.send(new StringMessage("testMessage")); SecurityContext context = (SecurityContext) channel.receive(-1).getHeaders().get( diff --git a/org.springframework.integration.xml/src/main/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolver.java b/org.springframework.integration.xml/src/main/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolver.java index 29c449e703..fc1eeaad97 100644 --- a/org.springframework.integration.xml/src/main/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolver.java +++ b/org.springframework.integration.xml/src/main/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolver.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,16 +16,15 @@ package org.springframework.integration.xml.router; +import org.w3c.dom.Node; + import org.springframework.integration.message.Message; import org.springframework.integration.router.ChannelNameResolver; import org.springframework.util.Assert; import org.springframework.xml.xpath.XPathExpression; -import org.w3c.dom.Node; /** - * * @author Jonas Partner - * */ public class XPathSingleChannelNameResolver extends AbstractXPathChannelNameResolver implements ChannelNameResolver { @@ -38,7 +37,6 @@ public class XPathSingleChannelNameResolver extends AbstractXPathChannelNameReso public String resolve(Message message) { Node node = extractNode(message); - System.out.println(xPathExpression.evaluateAsString(node)); return xPathExpression.evaluateAsString(node); } diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml index a85aacf7bb..16ccdff0ac 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests-context.xml @@ -10,12 +10,12 @@ + - diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java index e2c9bd2faf..f12678b042 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/config/XPathRouterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,65 +19,66 @@ package org.springframework.integration.xml.config; import static org.junit.Assert.assertNotNull; import org.junit.Test; +import org.w3c.dom.Document; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.xml.util.XmlTestUtil; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; -import org.w3c.dom.Document; - +/** + * @author Jonas Partner + */ @ContextConfiguration public class XPathRouterParserTests extends AbstractJUnit4SpringContextTests{ - + @Autowired @Qualifier("inputOne") MessageChannel inputOne; - + @Autowired @Qualifier("inputTwo") MessageChannel inputTwo; - @Autowired @Qualifier("outputOne") - MessageChannel outputOne; - + PollableChannel outputOne; @Autowired @Qualifier("outputTwo") - MessageChannel outputTwo; - + PollableChannel outputTwo; + @Autowired @Qualifier("errors") - MessageChannel errors; - - - - + PollableChannel errors; + + @SuppressWarnings("unchecked") @Test public void testOutputOne() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputOne"); + Document doc = XmlTestUtil.getDocumentForString("outputOne"); GenericMessage docMessage = new GenericMessage(doc); inputOne.send(docMessage); - GenericMessage received = (GenericMessage)outputOne.receive(1000); + GenericMessage received = (GenericMessage) outputOne.receive(1000); assertNotNull("Did not recevie message from outputOne", received); } @SuppressWarnings("unchecked") @Test public void testOutputTwo() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputTwo"); + Document doc = XmlTestUtil.getDocumentForString("outputTwo"); GenericMessage docMessage = new GenericMessage(doc); inputOne.send(docMessage); - GenericMessage received = (GenericMessage)outputTwo.receive(1000); + GenericMessage received = (GenericMessage) outputTwo.receive(1000); assertNotNull("Did not recevie message from two", received); } + @SuppressWarnings("unchecked") @Test public void testOutputThree() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputThree"); + Document doc = XmlTestUtil.getDocumentForString("outputThree"); GenericMessage docMessage = new GenericMessage(doc); inputOne.send(docMessage); - GenericMessage received = (GenericMessage)errors.receive(1000); + GenericMessage received = (GenericMessage) errors.receive(1000); assertNotNull("Did not recevie message on errors", received); } @@ -85,32 +86,31 @@ public class XPathRouterParserTests extends AbstractJUnit4SpringContextTests{ @SuppressWarnings("unchecked") @Test public void testOutputOneMulti() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputOne"); + Document doc = XmlTestUtil.getDocumentForString("outputOne"); GenericMessage docMessage = new GenericMessage(doc); inputTwo.send(docMessage); - GenericMessage received = (GenericMessage)outputOne.receive(1000); + GenericMessage received = (GenericMessage) outputOne.receive(1000); assertNotNull("Did not recevie message from outputOne", received); } @SuppressWarnings("unchecked") @Test public void testOutputOneAndTwoMulti() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputOneoutputTwo"); + Document doc = XmlTestUtil.getDocumentForString("outputOneoutputTwo"); GenericMessage docMessage = new GenericMessage(doc); inputTwo.send(docMessage); - GenericMessage received = (GenericMessage)outputTwo.receive(1000); + GenericMessage received = (GenericMessage) outputTwo.receive(1000); assertNotNull("Did not recevie message from two", received); } + @SuppressWarnings("unchecked") @Test public void testOutputThreeMulti() throws Exception{ - Document doc =XmlTestUtil.getDocumentForString("outputThree"); + Document doc = XmlTestUtil.getDocumentForString("outputThree"); GenericMessage docMessage = new GenericMessage(doc); inputTwo.send(docMessage); GenericMessage received = (GenericMessage) errors.receive(1000); assertNotNull("Did not recevie message on errors", received); - } - - + } diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathMultiChannelNameResolverTests.java b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathMultiChannelNameResolverTests.java similarity index 96% rename from org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathMultiChannelNameResolverTests.java rename to org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathMultiChannelNameResolverTests.java index 220e1a62bc..1b78f02136 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathMultiChannelNameResolverTests.java +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathMultiChannelNameResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,13 @@ * limitations under the License. */ -package org.springframework.integration.router; +package org.springframework.integration.xml.router; import static org.junit.Assert.assertEquals; import org.junit.Test; +import org.w3c.dom.Document; + import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; @@ -26,12 +28,9 @@ import org.springframework.integration.xml.router.XPathMultiChannelNameResolver; import org.springframework.integration.xml.util.XmlTestUtil; import org.springframework.xml.xpath.XPathExpression; import org.springframework.xml.xpath.XPathExpressionFactory; -import org.w3c.dom.Document; -/** - * +/** * @author Jonas Partner - * */ public class XPathMultiChannelNameResolverTests { diff --git a/org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathSingleChannelNameResolverTests.java b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolverTests.java similarity index 94% rename from org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathSingleChannelNameResolverTests.java rename to org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolverTests.java index f121d9aaf7..534b7f3533 100644 --- a/org.springframework.integration.xml/src/test/java/org/springframework/integration/router/XPathSingleChannelNameResolverTests.java +++ b/org.springframework.integration.xml/src/test/java/org/springframework/integration/xml/router/XPathSingleChannelNameResolverTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,13 @@ * limitations under the License. */ -package org.springframework.integration.router; +package org.springframework.integration.xml.router; import static org.junit.Assert.assertEquals; import org.junit.Test; +import org.w3c.dom.Document; + import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.StringMessage; @@ -26,12 +28,9 @@ import org.springframework.integration.xml.router.XPathSingleChannelNameResolver import org.springframework.integration.xml.util.XmlTestUtil; import org.springframework.xml.xpath.XPathExpression; import org.springframework.xml.xpath.XPathExpressionFactory; -import org.w3c.dom.Document; /** - * * @author Jonas Partner - * */ public class XPathSingleChannelNameResolverTests { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java index 411ef9d7c2..0f02581dff 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultChannelFactoryBean.java @@ -31,6 +31,7 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.integration.channel.ChannelInterceptor; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.factory.ChannelFactory; import org.springframework.integration.channel.factory.QueueChannelFactory; import org.springframework.util.Assert; @@ -73,11 +74,11 @@ public class DefaultChannelFactoryBean implements ApplicationContextAware, Facto } public void afterPropertiesSet() throws Exception { - synchronized (initializationMonitor) { + synchronized (this.initializationMonitor) { if (!initialized) { this.proxyBean = Proxy.newProxyInstance( getClass().getClassLoader(), - new Class[]{MessageChannel.class}, + new Class[] { PollableChannel.class }, new DefaultChannelInvocationHandler()); this.initialized = true; } @@ -85,14 +86,14 @@ public class DefaultChannelFactoryBean implements ApplicationContextAware, Facto } public Object getObject() throws Exception { - if (!initialized) { + if (!this.initialized) { afterPropertiesSet(); } return proxyBean; } public Class getObjectType() { - return MessageChannel.class; + return PollableChannel.class; } public boolean isSingleton() { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index dd2398f3af..a12100d426 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -56,7 +56,7 @@ import org.springframework.integration.endpoint.TargetEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.MessagePublishingErrorHandler; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.integration.scheduling.Schedule; @@ -330,8 +330,8 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A endpoint.setSource(source); } } - if (source != null && source instanceof Subscribable) { - ((Subscribable) source).subscribe(endpoint); + if (source != null && source instanceof SubscribableSource) { + ((SubscribableSource) source).subscribe(endpoint); if (logger.isInfoEnabled()) { logger.info("activated subscription to channel '" + source + "' for endpoint '" + endpoint + "'"); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java index f4d4e6876a..49dd54ae33 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java @@ -81,6 +81,13 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName this.interceptors.add(interceptor); } + /** + * Exposes the interceptor list for subclasses. + */ + protected ChannelInterceptorList getInterceptors() { + return this.interceptors; + } + /** * Send a message on this channel. If the channel is at capacity, this * method will block until either space becomes available or the sending @@ -119,44 +126,10 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName return sent; } - /** - * Receive the first available message from this channel. If the channel - * contains no messages, this method will block. - * - * @return the first available message or null if the - * receiving thread is interrupted. - */ - public final Message receive() { - return this.receive(-1); - } - - /** - * Receive the first available message from this channel. If the channel - * contains no messages, this method will block until the allotted timeout - * elapses. If the specified timeout is 0, the method will return - * immediately. If less than zero, it will block indefinitely (see - * {@link #receive()}). - * - * @param timeout the timeout in milliseconds - * - * @return the first available message or null if no message - * is available within the allotted time or the receiving thread is - * interrupted. - */ - public final Message receive(long timeout) { - if (!this.interceptors.preReceive(this)) { - return null; - } - Message message = this.doReceive(timeout); - message = this.interceptors.postReceive(message, this); - return message; - } - public String toString() { return (this.name != null) ? this.name : super.toString(); } - /** * Subclasses must implement this method. A non-negative timeout indicates * how long to wait if the channel is at capacity (if the value is 0, it @@ -166,20 +139,11 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName */ protected abstract boolean doSend(Message message, long timeout); - /** - * Subclasses must implement this method. A non-negative timeout indicates - * how long to wait if the channel is empty (if the value is 0, it must - * return immediately with or without success). A negative timeout value - * indicates that the method should block until either a message is - * available or the blocking thread is interrupted. - */ - protected abstract Message doReceive(long timeout); - /** * A convenience wrapper class for the list of ChannelInterceptors. */ - private class ChannelInterceptorList { + protected class ChannelInterceptorList { private final List interceptors = new CopyOnWriteArrayList(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java new file mode 100644 index 0000000000..15e6fd50d2 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java @@ -0,0 +1,71 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.channel; + +import org.springframework.integration.message.Message; + +/** + * Base class for all pollable channels. + * + * @author Mark Fisher + */ +public abstract class AbstractPollableChannel extends AbstractMessageChannel implements PollableChannel { + + /** + * Receive the first available message from this channel. If the channel + * contains no messages, this method will block. + * + * @return the first available message or null if the + * receiving thread is interrupted. + */ + public final Message receive() { + return this.receive(-1); + } + + /** + * Receive the first available message from this channel. If the channel + * contains no messages, this method will block until the allotted timeout + * elapses. If the specified timeout is 0, the method will return + * immediately. If less than zero, it will block indefinitely (see + * {@link #receive()}). + * + * @param timeout the timeout in milliseconds + * + * @return the first available message or null if no message + * is available within the allotted time or the receiving thread is + * interrupted. + */ + public final Message receive(long timeout) { + if (!this.getInterceptors().preReceive(this)) { + return null; + } + Message message = this.doReceive(timeout); + message = this.getInterceptors().postReceive(message, this); + return message; + } + + /** + * Subclasses must implement this method. A non-negative timeout indicates + * how long to wait if the channel is empty (if the value is 0, it must + * return immediately with or without success). A negative timeout value + * indicates that the method should block until either a message is + * available or the blocking thread is interrupted. + */ + protected abstract Message doReceive(long timeout); + + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelPurger.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelPurger.java index fe68861876..c3982bd3e2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelPurger.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ChannelPurger.java @@ -25,7 +25,7 @@ import org.springframework.util.Assert; /** * A utility class for purging {@link Message Messages} from one or more - * {@link MessageChannel MessageChannels}. Any message that does not + * {@link PollableChannel PollableChannels}. Any message that does not * match the provided {@link MessageSelector} will be removed from the channel. * If no {@link MessageSelector} is provided, then all messages will be * cleared from the channel. @@ -41,16 +41,16 @@ import org.springframework.util.Assert; */ public class ChannelPurger { - private final MessageChannel[] channels; + private final PollableChannel[] channels; private final MessageSelector selector; - public ChannelPurger(MessageChannel ... channels) { + public ChannelPurger(PollableChannel ... channels) { this(null, channels); } - public ChannelPurger(MessageSelector selector, MessageChannel ... channels) { + public ChannelPurger(MessageSelector selector, PollableChannel ... channels) { Assert.notEmpty(channels, "at least one channel is required"); if (channels.length == 1) { Assert.notNull(channels[0], "channel must not be null"); @@ -62,7 +62,7 @@ public class ChannelPurger { public final List> purge() { List> purgedMessages = new ArrayList>(); - for (MessageChannel channel : this.channels) { + for (PollableChannel channel : this.channels) { List> results = (this.selector == null) ? channel.clear() : channel.purge(this.selector); if (results != null) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java index 767324a172..bc8ab8ec34 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessageChannel.java @@ -16,19 +16,15 @@ package org.springframework.integration.channel; -import java.util.List; - -import org.springframework.integration.message.BlockingSource; import org.springframework.integration.message.BlockingTarget; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.selector.MessageSelector; +import org.springframework.integration.message.MessageSource; /** * Base channel interface defining common behavior for message sending and receiving. * * @author Mark Fisher */ -public interface MessageChannel extends BlockingSource, BlockingTarget { +public interface MessageChannel extends MessageSource, BlockingTarget { /** * Return the name of this channel. @@ -40,14 +36,4 @@ public interface MessageChannel extends BlockingSource, BlockingTarget { */ void setName(String name); - /** - * Remove all {@link Message Messages} from this channel. - */ - List> clear(); - - /** - * Remove any {@link Message Messages} that are not accepted by the provided selector. - */ - List> purge(MessageSelector selector); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PollableChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PollableChannel.java new file mode 100644 index 0000000000..5d8536ecd5 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PollableChannel.java @@ -0,0 +1,40 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.channel; + +import java.util.List; + +import org.springframework.integration.message.BlockingSource; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.selector.MessageSelector; + +/** + * @author Mark Fisher + */ +public interface PollableChannel extends MessageChannel, BlockingSource { + + /** + * Remove all {@link Message Messages} from this channel. + */ + List> clear(); + + /** + * Remove any {@link Message Messages} that are not accepted by the provided selector. + */ + List> purge(MessageSelector selector); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java index 7545365e79..222b7126b0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/QueueChannel.java @@ -35,7 +35,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class QueueChannel extends AbstractMessageChannel { +public class QueueChannel extends AbstractPollableChannel { public static final int DEFAULT_CAPACITY = 100; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java index e252d59d14..f8c2657fa6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/ThreadLocalChannel.java @@ -33,7 +33,7 @@ import org.springframework.integration.message.selector.MessageSelector; * @author Dave Syer * @author Mark Fisher */ -public class ThreadLocalChannel extends AbstractMessageChannel { +public class ThreadLocalChannel extends AbstractPollableChannel { private static final ThreadLocalMessageHolder messageHolder = new ThreadLocalMessageHolder(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java index 27504bcf75..f2d0a7c6a0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/DirectChannelParser.java @@ -18,10 +18,8 @@ package org.springframework.integration.config; import org.w3c.dom.Element; -import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.integration.channel.config.AbstractChannelParser; import org.springframework.integration.dispatcher.DirectChannel; -import org.springframework.util.StringUtils; /** * Parser for the <direct-channel> element. @@ -35,12 +33,4 @@ public class DirectChannelParser extends AbstractChannelParser { return DirectChannel.class; } - @Override - protected void configureConstructorArgs(BeanDefinitionBuilder builder, Element element) { - String source = element.getAttribute("source"); - if (StringUtils.hasText(source)) { - builder.addConstructorArgReference(source); - } - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index 59e31eec64..4f5c0b544a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -79,19 +79,12 @@ - - - - + + + Defines a channel that invokes its handlers directly in the sender's thread. - - - - - - - - + + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java index f1118f8a28..36503bc2cd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/DirectChannel.java @@ -16,50 +16,28 @@ package org.springframework.integration.dispatcher; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; -import org.springframework.integration.message.Subscribable; +import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.selector.MessageSelector; /** - * A channel that invokes the subscribed {@link MessageHandler handler(s)} in a - * sender's thread (returning after at most one handles the message). If a - * {@link MessageSource} is provided, then that source will likewise be polled - * within a receiver's thread. + * A channel that invokes the subscribed {@link MessageHandler handler(s)} in + * the sender's thread (returning after at most one handles the message). * * @author Dave Syer * @author Mark Fisher */ -public class DirectChannel extends AbstractMessageChannel implements Subscribable { +public class DirectChannel extends AbstractMessageChannel implements SubscribableSource { - private volatile MessageSource source; - - private final SimpleDispatcher dispatcher; + private final SimpleDispatcher dispatcher = new SimpleDispatcher(); private final AtomicInteger handlerCount = new AtomicInteger(); - public DirectChannel() { - this(null); - } - - public DirectChannel(MessageSource source) { - this.source = source; - this.dispatcher = new SimpleDispatcher(); - } - - - public void setSource(MessageSource source) { - this.source = source; - } - public boolean subscribe(MessageTarget target) { boolean added = this.dispatcher.addTarget(target); if (added) { @@ -76,15 +54,6 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl return removed; } - - @Override - protected Message doReceive(long timeout) { - if (this.source != null) { - return this.source.receive(); - } - return null; - } - @Override protected boolean doSend(Message message, long timeout) { if (message != null && this.handlerCount.get() > 0) { @@ -93,12 +62,4 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl return false; } - public List> clear() { - return new ArrayList>(); - } - - public List> purge(MessageSelector selector) { - return new ArrayList>(); - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java index 21da91044d..8a22c28384 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PollingDispatcher.java @@ -19,8 +19,8 @@ package org.springframework.integration.dispatcher; import org.springframework.integration.message.BlockingSource; import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.scheduling.SchedulableTask; import org.springframework.integration.scheduling.Schedule; import org.springframework.util.Assert; @@ -30,7 +30,7 @@ import org.springframework.util.Assert; */ public class PollingDispatcher implements SchedulableTask { - private final MessageSource source; + private final PollableSource source; private final MessageDispatcher dispatcher; @@ -42,10 +42,10 @@ public class PollingDispatcher implements SchedulableTask { /** - * Create a PollingDispatcher for the provided {@link MessageSource}. + * Create a PollingDispatcher for the provided {@link PollableSource}. * It can be scheduled according to the specified {@link Schedule}. */ - public PollingDispatcher(MessageSource source, MessageDispatcher dispatcher, Schedule schedule) { + public PollingDispatcher(PollableSource source, MessageDispatcher dispatcher, Schedule schedule) { Assert.notNull(source, "source must not be null"); Assert.notNull(dispatcher, "dispatcher must not be null"); this.source = source; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java index 848faceb11..cba5b5c8ef 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/PublishSubscribeChannel.java @@ -16,19 +16,16 @@ package org.springframework.integration.dispatcher; -import java.util.List; - import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.Subscribable; -import org.springframework.integration.message.selector.MessageSelector; +import org.springframework.integration.message.SubscribableSource; /** * @author Mark Fisher */ -public class PublishSubscribeChannel extends AbstractMessageChannel implements Subscribable { +public class PublishSubscribeChannel extends AbstractMessageChannel implements SubscribableSource { private final BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); @@ -64,17 +61,4 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S return this.dispatcher.send(message); } - @Override - protected Message doReceive(long timeout) { - return null; - } - - public List> clear() { - return null; - } - - public List> purge(MessageSelector selector) { - return null; - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointPoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointPoller.java index 77c8b9678c..cb01cbf823 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointPoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointPoller.java @@ -16,7 +16,10 @@ package org.springframework.integration.endpoint; +import org.springframework.integration.ConfigurationException; import org.springframework.integration.message.MessageExchangeTemplate; +import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; /** * @author Mark Fisher @@ -25,13 +28,23 @@ public class EndpointPoller implements EndpointVisitor { private final MessageExchangeTemplate template; + public EndpointPoller() { this.template = new MessageExchangeTemplate(); this.template.setSendTimeout(0); } public void visitEndpoint(MessageEndpoint endpoint) { - template.receiveAndForward(endpoint.getSource(), endpoint); + MessageSource source = endpoint.getSource(); + if (source == null) { + throw new ConfigurationException("unable to poll for endpoint '" + + endpoint + "', source is null"); + } + if (!(source instanceof PollableSource)) { + throw new ConfigurationException("unable to poll for endpoint '" + + endpoint + ", source is not a PollableSource"); + } + this.template.receiveAndForward((PollableSource) source, endpoint); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointTrigger.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointTrigger.java index 287c603843..fdb6009f1c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointTrigger.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointTrigger.java @@ -19,7 +19,7 @@ package org.springframework.integration.endpoint; import org.springframework.integration.dispatcher.BroadcastingDispatcher; import org.springframework.integration.dispatcher.PollingDispatcher; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.integration.scheduling.Schedule; @@ -55,7 +55,7 @@ public class EndpointTrigger extends PollingDispatcher { } - private static class TriggerSource implements MessageSource { + private static class TriggerSource implements PollableSource { public Message receive() { return new TriggerMessage(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java index abaf164096..bab5e83aa7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java @@ -20,6 +20,7 @@ import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.MessageBusAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.EndpointRegistry; import org.springframework.integration.endpoint.HandlerEndpoint; import org.springframework.integration.endpoint.MessagingGateway; @@ -46,7 +47,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M private volatile MessageChannel requestChannel; - private volatile MessageChannel replyChannel; + private volatile PollableChannel replyChannel; private volatile long replyTimeout = 5000; @@ -87,7 +88,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M * * @param replyChannel the channel from which reply messages will be received */ - public void setReplyChannel(MessageChannel replyChannel) { + public void setReplyChannel(PollableChannel replyChannel) { this.replyChannel = replyChannel; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/AsyncMessageExchangeTemplate.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/AsyncMessageExchangeTemplate.java index 88abedc39e..156a9f5142 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/AsyncMessageExchangeTemplate.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/AsyncMessageExchangeTemplate.java @@ -22,7 +22,6 @@ import java.util.concurrent.FutureTask; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageExchangeTemplate; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; import org.springframework.util.Assert; @@ -78,7 +77,7 @@ public class AsyncMessageExchangeTemplate extends MessageExchangeTemplate { */ @Override @SuppressWarnings("unchecked") - public Message receive(final MessageSource source) { + public Message receive(final PollableSource source) { FutureTask> task = new FutureTask>(new Callable>() { public Message call() throws Exception { return AsyncMessageExchangeTemplate.super.receive(source); @@ -95,7 +94,7 @@ public class AsyncMessageExchangeTemplate extends MessageExchangeTemplate { * unless an exception is thrown by the executor. */ @Override - public boolean receiveAndForward(final MessageSource source, final MessageTarget target) { + public boolean receiveAndForward(final PollableSource source, final MessageTarget target) { this.taskExecutor.execute(new Runnable() { public void run() { AsyncMessageExchangeTemplate.super.receiveAndForward(source, target); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/BlockingSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/BlockingSource.java index c0b0484092..83e726e702 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/BlockingSource.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/BlockingSource.java @@ -21,7 +21,7 @@ package org.springframework.integration.message; * * @author Mark Fisher */ -public interface BlockingSource extends MessageSource { +public interface BlockingSource extends PollableSource { /** * Receive a message, blocking indefinitely if necessary. diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageExchangeTemplate.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageExchangeTemplate.java index 7abc0c6385..0fb818cafe 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageExchangeTemplate.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageExchangeTemplate.java @@ -27,7 +27,6 @@ import org.springframework.integration.message.BlockingSource; import org.springframework.integration.message.BlockingTarget; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; @@ -36,9 +35,9 @@ import org.springframework.transaction.support.TransactionTemplate; /** * This is the central class for invoking message exchange operations - * across {@link MessageSource}s and {@link MessageTarget}s. It supports + * across {@link PollableSource}s and {@link MessageTarget}s. It supports * one-way send and receive calls as well as request/reply. Additionally, - * the {@link #receiveAndForward(MessageSource, MessageTarget)} method + * the {@link #receiveAndForward(PollableSource, MessageTarget)} method * plays the role of a polling-consumer while actually sending any * received message to an event-driven consumer. * @@ -167,7 +166,7 @@ public class MessageExchangeTemplate implements InitializingBean { return this.doSendAndReceive(request, target); } - public Message receive(final MessageSource source) { + public Message receive(final PollableSource source) { TransactionTemplate txTemplate = this.getTransactionTemplate(); if (txTemplate != null) { return (Message) txTemplate.execute(new TransactionCallback() { @@ -179,7 +178,7 @@ public class MessageExchangeTemplate implements InitializingBean { return this.doReceive(source); } - public boolean receiveAndForward(final MessageSource source, final MessageTarget target) { + public boolean receiveAndForward(final PollableSource source, final MessageTarget target) { TransactionTemplate txTemplate = this.getTransactionTemplate(); if (txTemplate != null) { return (Boolean) txTemplate.execute(new TransactionCallback() { @@ -203,7 +202,7 @@ public class MessageExchangeTemplate implements InitializingBean { return sent; } - private Message doReceive(MessageSource source) { + private Message doReceive(PollableSource source) { long timeout = this.receiveTimeout; Message message = (timeout >= 0 && source instanceof BlockingSource) ? ((BlockingSource) source).receive(timeout) @@ -223,7 +222,7 @@ public class MessageExchangeTemplate implements InitializingBean { return this.doReceive(returnAddress); } - private boolean doReceiveAndForward(MessageSource source, MessageTarget target) { + private boolean doReceiveAndForward(PollableSource source, MessageTarget target) { Message message = this.doReceive(source); if (message == null) { return false; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageSource.java index d8aa06a17d..8070d77cf7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageSource.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MessageSource.java @@ -23,9 +23,4 @@ package org.springframework.integration.message; */ public interface MessageSource { - /** - * Retrieve a message from this source or null if no message is available. - */ - Message receive(); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/MethodInvokingSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/MethodInvokingSource.java index ff75e9d9b2..42d47f9205 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/MethodInvokingSource.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/MethodInvokingSource.java @@ -31,7 +31,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class MethodInvokingSource implements MessageSource, InitializingBean { +public class MethodInvokingSource implements PollableSource, InitializingBean { private Object object; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/PollableSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/PollableSource.java new file mode 100644 index 0000000000..df9c5f7d24 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/PollableSource.java @@ -0,0 +1,31 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.message; + +/** + * Base interface for any source of {@link Message Messages} that can be polled. + * + * @author Mark Fisher + */ +public interface PollableSource extends MessageSource { + + /** + * Retrieve a message from this source or null if no message is available. + */ + Message receive(); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java similarity index 88% rename from org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java rename to org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java index bd369dcdbd..7722d061c9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/Subscribable.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java @@ -17,11 +17,11 @@ package org.springframework.integration.message; /** - * Interface for any component that accepts subscribers. + * Interface for any source of messages that accepts subscribers. * * @author Mark Fisher */ -public interface Subscribable { +public interface SubscribableSource extends MessageSource { /** * Register a {@link MessageTarget} as a subscriber to this source. diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aop/MessagePublishingInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aop/MessagePublishingInterceptorTests.java index 66a27bfe2b..26992dbe2a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aop/MessagePublishingInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aop/MessagePublishingInterceptorTests.java @@ -23,7 +23,6 @@ import static org.junit.Assert.assertNull; import org.junit.Test; import org.springframework.aop.framework.ProxyFactory; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -34,7 +33,7 @@ public class MessagePublishingInterceptorTests { @Test public void testNonNullReturnValuePublishedWithDefaultChannel() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor(); interceptor.setDefaultChannel(channel); TestService proxy = (TestService) this.createProxy(new TestServiceImpl("hello world"), interceptor); @@ -46,7 +45,7 @@ public class MessagePublishingInterceptorTests { @Test public void testNullReturnValueNotPublished() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor(); interceptor.setDefaultChannel(channel); TestService proxy = (TestService) this.createProxy(new TestServiceImpl(null), interceptor); @@ -56,7 +55,7 @@ public class MessagePublishingInterceptorTests { @Test public void testVoidReturnValueNotPublished() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); MessagePublishingInterceptor interceptor = new MessagePublishingInterceptor(); interceptor.setDefaultChannel(channel); TestService proxy = (TestService) this.createProxy(new TestServiceImpl(null), interceptor); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationAdvisorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationAdvisorTests.java index 3f47290dc4..ece0aed1cf 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationAdvisorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationAdvisorTests.java @@ -26,7 +26,6 @@ import org.springframework.aop.framework.ProxyFactory; import org.springframework.integration.annotation.Publisher; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.DefaultChannelRegistry; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; @@ -37,7 +36,7 @@ public class PublisherAnnotationAdvisorTests { @Test public void testPublisherAnnotation() { - final MessageChannel channel = new QueueChannel(); + final QueueChannel channel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("testChannel", channel); PublisherAnnotationAdvisor advisor = new PublisherAnnotationAdvisor(channelRegistry); @@ -50,7 +49,7 @@ public class PublisherAnnotationAdvisorTests { @Test public void testNoPublisherAnnotation() { - final MessageChannel channel = new QueueChannel(); + final QueueChannel channel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("testChannel", channel); PublisherAnnotationAdvisor advisor = new PublisherAnnotationAdvisor(channelRegistry); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationPostProcessorTests.java index a200325a35..c5c505e65e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/aop/PublisherAnnotationPostProcessorTests.java @@ -22,7 +22,7 @@ import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; /** @@ -36,7 +36,7 @@ public class PublisherAnnotationPostProcessorTests { "publisherAnnotationPostProcessorTests.xml", this.getClass()); ITestBean testBean = (ITestBean) context.getBean("testBean"); testBean.test(); - MessageChannel channel = (MessageChannel) context.getBean("testChannel"); + PollableChannel channel = (PollableChannel) context.getBean("testChannel"); Message result = channel.receive(); assertEquals("test", result.getPayload()); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 8f59d67b97..311736ab3d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dispatcher.PublishSubscribeChannel; import org.springframework.integration.endpoint.SourceEndpoint; @@ -38,6 +39,7 @@ import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.message.StringMessage; import org.springframework.integration.scheduling.PollingSchedule; @@ -49,8 +51,8 @@ public class DefaultMessageBusTests { @Test public void testRegistrationWithInputChannelReference() { DefaultMessageBus bus = new DefaultMessageBus(); - MessageChannel sourceChannel = new QueueChannel(); - MessageChannel targetChannel = new QueueChannel(); + QueueChannel sourceChannel = new QueueChannel(); + QueueChannel targetChannel = new QueueChannel(); bus.registerChannel("sourceChannel", sourceChannel); Message message = MessageBuilder.fromPayload("test") .setReturnAddress("targetChannel").build(); @@ -71,8 +73,8 @@ public class DefaultMessageBusTests { @Test public void testRegistrationWithInputChannelName() { MessageBus bus = new DefaultMessageBus(); - MessageChannel sourceChannel = new QueueChannel(); - MessageChannel targetChannel = new QueueChannel(); + QueueChannel sourceChannel = new QueueChannel(); + QueueChannel targetChannel = new QueueChannel(); bus.registerChannel("sourceChannel", sourceChannel); Message message = MessageBuilder.fromPayload("test") .setReturnAddress("targetChannel").build(); @@ -93,9 +95,9 @@ public class DefaultMessageBusTests { @Test public void testChannelsWithoutHandlers() { MessageBus bus = new DefaultMessageBus(); - MessageChannel sourceChannel = new QueueChannel(); + QueueChannel sourceChannel = new QueueChannel(); sourceChannel.send(new StringMessage("test")); - MessageChannel targetChannel = new QueueChannel(); + QueueChannel targetChannel = new QueueChannel(); bus.registerChannel("sourceChannel", sourceChannel); bus.registerChannel("targetChannel", targetChannel); bus.start(); @@ -108,9 +110,9 @@ public class DefaultMessageBusTests { public void testAutodetectionWithApplicationContext() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("messageBusTests.xml", this.getClass()); context.start(); - MessageChannel sourceChannel = (MessageChannel) context.getBean("sourceChannel"); + PollableChannel sourceChannel = (PollableChannel) context.getBean("sourceChannel"); sourceChannel.send(new GenericMessage("test")); - MessageChannel targetChannel = (MessageChannel) context.getBean("targetChannel"); + PollableChannel targetChannel = (PollableChannel) context.getBean("targetChannel"); MessageBus bus = (MessageBus) context.getBean("bus"); bus.start(); Message result = targetChannel.receive(1000); @@ -198,7 +200,7 @@ public class DefaultMessageBusTests { bus.registerEndpoint(sourceEndpoint); bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); - Message message = bus.getErrorChannel().receive(100); + Message message = ((PollableChannel) bus.getErrorChannel()).receive(5000); assertNotNull("message should not be null", message); assertTrue(message instanceof ErrorMessage); assertEquals("intentional test failure", ((ErrorMessage) message).getPayload().getMessage()); @@ -245,7 +247,7 @@ public class DefaultMessageBusTests { } - private static class FailingSource implements MessageSource { + private static class FailingSource implements PollableSource { private CountDownLatch latch; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index baae295d65..d190e03e36 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -43,7 +43,7 @@ public class DirectChannelSubscriptionTests { private MessageChannel sourceChannel = new DirectChannel(); - private MessageChannel targetChannel = new ThreadLocalChannel(); + private ThreadLocalChannel targetChannel = new ThreadLocalChannel(); @Before diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/ChannelPurgerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/ChannelPurgerTests.java index 19194069c1..38edf5fe84 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/ChannelPurgerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/ChannelPurgerTests.java @@ -35,7 +35,7 @@ public class ChannelPurgerTests { @Test public void testPurgeAllWithoutSelector() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); channel.send(new StringMessage("test1")); channel.send(new StringMessage("test2")); channel.send(new StringMessage("test3")); @@ -47,7 +47,7 @@ public class ChannelPurgerTests { @Test public void testPurgeAllWithSelector() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); channel.send(new StringMessage("test1")); channel.send(new StringMessage("test2")); channel.send(new StringMessage("test3")); @@ -63,7 +63,7 @@ public class ChannelPurgerTests { @Test public void testPurgeNoneWithSelector() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); channel.send(new StringMessage("test1")); channel.send(new StringMessage("test2")); channel.send(new StringMessage("test3")); @@ -81,7 +81,7 @@ public class ChannelPurgerTests { @Test public void testPurgeSubsetWithSelector() { - MessageChannel channel = new QueueChannel(); + QueueChannel channel = new QueueChannel(); channel.send(new StringMessage("test1")); channel.send(new StringMessage("test2")); channel.send(new StringMessage("test3")); @@ -100,8 +100,8 @@ public class ChannelPurgerTests { @Test public void testMultipleChannelsWithNoSelector() { - MessageChannel channel1 = new QueueChannel(); - MessageChannel channel2 = new QueueChannel(); + QueueChannel channel1 = new QueueChannel(); + QueueChannel channel2 = new QueueChannel(); channel1.send(new StringMessage("test1")); channel1.send(new StringMessage("test2")); channel2.send(new StringMessage("test1")); @@ -115,8 +115,8 @@ public class ChannelPurgerTests { @Test public void testMultipleChannelsWithSelector() { - MessageChannel channel1 = new QueueChannel(); - MessageChannel channel2 = new QueueChannel(); + QueueChannel channel1 = new QueueChannel(); + QueueChannel channel2 = new QueueChannel(); channel1.send(new StringMessage("test1")); channel1.send(new StringMessage("test2")); channel1.send(new StringMessage("test3")); @@ -142,8 +142,8 @@ public class ChannelPurgerTests { @Test public void testPurgeNoneWithSelectorAndMultipleChannels() { - MessageChannel channel1 = new QueueChannel(); - MessageChannel channel2 = new QueueChannel(); + QueueChannel channel1 = new QueueChannel(); + QueueChannel channel2 = new QueueChannel(); channel1.send(new StringMessage("test1")); channel1.send(new StringMessage("test2")); channel2.send(new StringMessage("test1")); @@ -163,13 +163,13 @@ public class ChannelPurgerTests { @Test(expected=IllegalArgumentException.class) public void testNullChannel() { - MessageChannel channel = null; + QueueChannel channel = null; new ChannelPurger(channel); } @Test(expected=IllegalArgumentException.class) public void testEmptyChannelArray() { - MessageChannel[] channels = new MessageChannel[0]; + QueueChannel[] channels = new QueueChannel[0]; new ChannelPurger(channels); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java index 2b0f876e08..b50b72cb34 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/ChannelParserTests.java @@ -31,6 +31,7 @@ import org.springframework.beans.FatalBeanException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.TestChannelInterceptor; import org.springframework.integration.dispatcher.PublishSubscribeChannel; @@ -140,7 +141,7 @@ public class ChannelParserTests { public void testChannelInteceptors() { ApplicationContext context = new ClassPathXmlApplicationContext( "channelInterceptorParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("channel"); + PollableChannel channel = (PollableChannel) context.getBean("channel"); TestChannelInterceptor interceptor = (TestChannelInterceptor) context.getBean("interceptor"); assertEquals(0, interceptor.getSendCount()); channel.send(new StringMessage("test")); @@ -154,7 +155,7 @@ public class ChannelParserTests { public void testPriorityChannelWithDefaultComparator() { ApplicationContext context = new ClassPathXmlApplicationContext( "priorityChannelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("priorityChannelWithDefaultComparator"); + PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithDefaultComparator"); Message lowPriorityMessage = MessageBuilder.fromPayload("low") .setPriority(MessagePriority.LOW).build(); Message midPriorityMessage = MessageBuilder.fromPayload("mid") @@ -176,7 +177,7 @@ public class ChannelParserTests { public void testPriorityChannelWithCustomComparator() { ApplicationContext context = new ClassPathXmlApplicationContext( "priorityChannelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("priorityChannelWithCustomComparator"); + PollableChannel channel = (PollableChannel) context.getBean("priorityChannelWithCustomComparator"); channel.send(new StringMessage("C")); channel.send(new StringMessage("A")); channel.send(new StringMessage("D")); @@ -195,7 +196,7 @@ public class ChannelParserTests { public void testPriorityChannelWithIntegerDatatypeEnforced() { ApplicationContext context = new ClassPathXmlApplicationContext( "priorityChannelParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("integerOnlyPriorityChannel"); + PollableChannel channel = (PollableChannel) context.getBean("integerOnlyPriorityChannel"); channel.send(new GenericMessage(3)); channel.send(new GenericMessage(2)); channel.send(new GenericMessage(1)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSource.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSource.java index 6ed3aa5a0a..ad5d36af84 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSource.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSource.java @@ -17,13 +17,13 @@ package org.springframework.integration.channel.config; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher */ -public class TestSource implements MessageSource { +public class TestSource implements PollableSource { private final String text; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java index 0ec914431f..14a205d760 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/factory/StubChannel.java @@ -18,14 +18,14 @@ package org.springframework.integration.channel.factory; import java.util.List; -import org.springframework.integration.channel.AbstractMessageChannel; +import org.springframework.integration.channel.AbstractPollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.selector.MessageSelector; /** * @author Marius Bogoevici */ -public class StubChannel extends AbstractMessageChannel { +public class StubChannel extends AbstractPollableChannel { @Override protected Message doReceive(long timeout) { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 49ec17ab0e..9e2486c3f4 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -29,6 +29,7 @@ import org.springframework.beans.factory.BeanCreationException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.HandlerEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -114,7 +115,7 @@ public class AggregatorParserTests { for (Message message : outboundMessages) { addingAggregator.handle(message); } - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message response = outputChannel.receive(); Assert.assertEquals(6l, response.getPayload()); } @@ -144,7 +145,7 @@ public class AggregatorParserTests { aggregatorWithPojoCompletionStrategy.handle(createMessage(1l, "id1", 0 , 0, null)); aggregatorWithPojoCompletionStrategy.handle(createMessage(2l, "id1", 0 , 0, null)); aggregatorWithPojoCompletionStrategy.handle(createMessage(3l, "id1", 0 , 0, null)); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message reply = outputChannel.receive(0); Assert.assertNull(reply); aggregatorWithPojoCompletionStrategy.handle(createMessage(5l, "id1", 0 , 0, null)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index 07ec1faff1..5dfaf6ba8a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -27,6 +27,7 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -71,7 +72,7 @@ public class EndpointParserTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithHandlerChainElement.xml", this.getClass()); MessageChannel channel = (MessageChannel) context.getBean("testChannel"); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel"); channel.send(new StringMessage("test")); Message reply = replyChannel.receive(500); assertNotNull(reply); @@ -83,7 +84,7 @@ public class EndpointParserTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); MessageTarget endpoint = (MessageTarget) context.getBean("endpoint"); - MessageChannel replyChannel = new QueueChannel(); + QueueChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); assertTrue(endpoint.send(message)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index eb89835092..c82db7213b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -27,6 +27,7 @@ import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.router.ResequencingMessageHandler; @@ -49,7 +50,7 @@ public class ResequencerParserTests { public void testResequencing() { ResequencingMessageHandler resequencingHandler = (ResequencingMessageHandler) context .getBean("defaultResequencer"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); List> outboundMessages = new ArrayList>(); outboundMessages.add(createMessage("123", "id1", 3, 3, outputChannel)); outboundMessages.add(createMessage("789", "id1", 3, 1, outputChannel)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java index ec928c88d4..e8da3a6006 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestSource.java @@ -17,13 +17,14 @@ package org.springframework.integration.config; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher */ -public class TestSource implements MessageSource { +@SuppressWarnings("unchecked") +public class TestSource implements PollableSource { public Message receive() { return new StringMessage("test"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index e9629b6757..84b6bc6efc 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -52,6 +52,7 @@ import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.endpoint.HandlerEndpoint; @@ -101,7 +102,7 @@ public class MessagingAnnotationPostProcessorTests { ApplicationContext context = new ClassPathXmlApplicationContext( "handlerAnnotationPostProcessorTests.xml", this.getClass()); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); inputChannel.send(new StringMessage("foo")); Message reply = outputChannel.receive(1000); assertEquals("hello foo", reply.getPayload()); @@ -112,7 +113,7 @@ public class MessagingAnnotationPostProcessorTests { AbstractApplicationContext context = new ClassPathXmlApplicationContext("simpleAnnotatedEndpointTests.xml", this.getClass()); context.start(); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -126,7 +127,7 @@ public class MessagingAnnotationPostProcessorTests { context.start(); ChannelRegistry channelRegistry = (ChannelRegistry) context.getBean("bus"); MessageChannel inputChannel = channelRegistry.lookupChannel("inputChannel"); - MessageChannel outputChannel = channelRegistry.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) channelRegistry.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -138,7 +139,7 @@ public class MessagingAnnotationPostProcessorTests { AbstractApplicationContext context = new ClassPathXmlApplicationContext("messageParameterAnnotatedEndpointTests.xml", this.getClass()); context.start(); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -150,7 +151,7 @@ public class MessagingAnnotationPostProcessorTests { AbstractApplicationContext context = new ClassPathXmlApplicationContext("typeConvertingEndpointTests.xml", this.getClass()); context.start(); MessageChannel inputChannel = (MessageChannel) context.getBean("inputChannel"); - MessageChannel outputChannel = (MessageChannel) context.getBean("outputChannel"); + PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); inputChannel.send(new StringMessage("123")); Message message = outputChannel.receive(1000); assertEquals(246, message.getPayload()); @@ -226,7 +227,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.postProcessAfterInitialization(proxy, "proxy"); messageBus.start(); MessageChannel inputChannel = messageBus.lookupChannel("inputChannel"); - MessageChannel outputChannel = messageBus.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -241,7 +242,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass"); messageBus.start(); MessageChannel inputChannel = messageBus.lookupChannel("inputChannel"); - MessageChannel outputChannel = messageBus.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -258,7 +259,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.postProcessAfterInitialization(proxy, "proxy"); messageBus.start(); MessageChannel inputChannel = messageBus.lookupChannel("inputChannel"); - MessageChannel outputChannel = messageBus.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); @@ -268,7 +269,7 @@ public class MessagingAnnotationPostProcessorTests { public void testMessageEndpointAnnotationInheritedFromInterface() { MessageBus messageBus = new DefaultMessageBus(); MessageChannel inputChannel = new QueueChannel(); - MessageChannel outputChannel = new QueueChannel(); + QueueChannel outputChannel = new QueueChannel(); messageBus.registerChannel("inputChannel", inputChannel); messageBus.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); @@ -289,7 +290,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); messageBus.start(); MessageChannel inputChannel = messageBus.lookupChannel("inputChannel"); - MessageChannel outputChannel = messageBus.lookupChannel("outputChannel"); + PollableChannel outputChannel = (PollableChannel) messageBus.lookupChannel("outputChannel"); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); assertEquals("test-ABC", message.getPayload()); @@ -299,7 +300,7 @@ public class MessagingAnnotationPostProcessorTests { public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() { MessageBus messageBus = new DefaultMessageBus(); MessageChannel inputChannel = new QueueChannel(); - MessageChannel outputChannel = new QueueChannel(); + QueueChannel outputChannel = new QueueChannel(); messageBus.registerChannel("inputChannel", inputChannel); messageBus.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelParserTests.java index b650a272c8..4fbf5c8ce3 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelParserTests.java @@ -17,38 +17,22 @@ package org.springframework.integration.dispatcher; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.StringMessage; /** * @author Mark Fisher */ public class DirectChannelParserTests { - @Test - public void testReceivesNullFromChannelWithoutSource() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "directChannelParserTests.xml", DirectChannelParserTests.class); - DirectChannel channel = (DirectChannel) context.getBean("channelWithoutSource"); - assertNull(channel.receive()); - } - @Test public void testReceivesMessageFromChannelWithSource() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "directChannelParserTests.xml", DirectChannelParserTests.class); - DirectChannel channel = (DirectChannel) context.getBean("channelWithSource"); - assertFalse(channel.send(new StringMessage("test"))); - Message reply = channel.receive(); - assertNotNull(reply); - assertEquals("foo", reply.getPayload()); + Object channel = context.getBean("channel"); + assertEquals(DirectChannel.class, channel.getClass()); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelTests.java index 5b9ae58d83..d8915f9897 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/DirectChannelTests.java @@ -17,19 +17,14 @@ package org.springframework.integration.dispatcher; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageBuilder; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.message.StringMessage; @@ -38,9 +33,6 @@ import org.springframework.integration.message.StringMessage; */ public class DirectChannelTests { - private static final String HANDLER_THREAD = "handler-thread"; - - @Test public void testSend() { DirectChannel channel = new DirectChannel(); @@ -67,63 +59,6 @@ public class DirectChannelTests { assertEquals("test-thread", target.threadName); } - @Test - public void testReceive() { - DirectChannel channel = new DirectChannel(new MessageSource() { - public Message receive() { - return new StringMessage("foo"); - } - }); - Message message = channel.receive(); - assertNotNull(message); - assertNotNull(message.getPayload()); - assertEquals(String.class, message.getPayload().getClass()); - assertEquals("foo", message.getPayload()); - } - - @Test - public void testReceiveWithMessageResult() { - DirectChannel channel = new DirectChannel(new MessageReturningTestSource("foo")); - Message message = channel.receive(); - assertNotNull(message); - assertNotNull(message.getPayload()); - assertEquals(String.class, message.getPayload().getClass()); - assertEquals("foo", message.getPayload()); - String handlerThreadName = message.getHeaders().get(HANDLER_THREAD, String.class); - assertEquals(Thread.currentThread().getName(), handlerThreadName); - } - - @Test - public void testReceiveInSeparateThread() throws InterruptedException { - final DirectChannel channel = new DirectChannel(new MessageReturningTestSource("foo")); - final SynchronousQueue> messageHolder = new SynchronousQueue>(); - new Thread(new Runnable() { - public void run() { - Message message = channel.receive(); - assertNotNull(message); - try { - messageHolder.put(message); - } - catch (InterruptedException e) { - // will fail after timeout below - } - } - }, "test-thread").start(); - Message message = messageHolder.poll(1000, TimeUnit.MILLISECONDS); - assertNotNull(message); - assertNotNull(message.getPayload()); - assertEquals(String.class, message.getPayload().getClass()); - assertEquals("foo", message.getPayload()); - String handlerThreadName = message.getHeaders().get(HANDLER_THREAD, String.class); - assertEquals("test-thread", handlerThreadName); - } - - @Test - public void testReceiveWithNoSource() { - DirectChannel channel = new DirectChannel(); - assertNull(channel.receive()); - } - private static class ThreadNameExtractingTestTarget implements MessageTarget { @@ -149,20 +84,4 @@ public class DirectChannelTests { } } - - private static class MessageReturningTestSource implements MessageSource { - - private final String messageText; - - - MessageReturningTestSource(String messageText) { - this.messageText = messageText; - } - - public Message receive() { - return MessageBuilder.fromPayload(messageText) - .setHeader(HANDLER_THREAD, Thread.currentThread().getName()).build(); - } - } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/directChannelParserTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/directChannelParserTests.xml index c3eb918fd8..726c248263 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/directChannelParserTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/directChannelParserTests.xml @@ -7,12 +7,6 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd"> - + - - - - - - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java index e45fa2f531..721238ea72 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java @@ -30,7 +30,6 @@ import org.junit.Test; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.DefaultChannelRegistry; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.TestHandlers; @@ -48,7 +47,7 @@ public class HandlerEndpointTests { @Test public void testDefaultReplyChannel() throws Exception { - MessageChannel replyChannel = new QueueChannel(); + QueueChannel replyChannel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("replyChannel", replyChannel); MessageHandler handler = new MessageHandler() { @@ -67,7 +66,7 @@ public class HandlerEndpointTests { @Test public void testExplicitReplyChannel() throws Exception { - final MessageChannel replyChannel = new QueueChannel(); + final QueueChannel replyChannel = new QueueChannel(); MessageHandler handler = new MessageHandler() { public Message handle(Message message) { return new StringMessage("hello " + message.getPayload()); @@ -84,7 +83,7 @@ public class HandlerEndpointTests { @Test public void testExplicitReplyChannelName() throws Exception { - final MessageChannel replyChannel = new QueueChannel(); + final QueueChannel replyChannel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("replyChannel", replyChannel); MessageHandler handler = new MessageHandler() { @@ -104,8 +103,8 @@ public class HandlerEndpointTests { @Test public void testDynamicReplyChannel() throws Exception { - final MessageChannel replyChannel1 = new QueueChannel(); - final MessageChannel replyChannel2 = new QueueChannel(); + final QueueChannel replyChannel1 = new QueueChannel(); + final QueueChannel replyChannel2 = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("replyChannel2", replyChannel2); MessageHandler handler = new MessageHandler() { @@ -135,7 +134,7 @@ public class HandlerEndpointTests { @Test public void testHandlerReturnsNull() throws InterruptedException { - MessageChannel replyChannel = new QueueChannel(); + QueueChannel replyChannel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("replyChannel", replyChannel); final CountDownLatch latch = new CountDownLatch(1); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ReturnAddressTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ReturnAddressTests.java index 4828e50f55..a80251c2d3 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ReturnAddressTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ReturnAddressTests.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; @@ -37,7 +38,7 @@ public class ReturnAddressTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "returnAddressTests.xml", this.getClass()); MessageChannel channel1 = (MessageChannel) context.getBean("channel1WithOverride"); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel"); context.start(); Message message = MessageBuilder.fromPayload("*") .setReturnAddress("replyChannel").build(); @@ -52,7 +53,7 @@ public class ReturnAddressTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "returnAddressTests.xml", this.getClass()); MessageChannel channel1 = (MessageChannel) context.getBean("channel1"); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel"); context.start(); Message message = MessageBuilder.fromPayload("*") .setReturnAddress("replyChannel").build(); @@ -67,7 +68,7 @@ public class ReturnAddressTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "returnAddressTests.xml", this.getClass()); MessageChannel channel4 = (MessageChannel) context.getBean("channel4"); - MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); + PollableChannel replyChannel = (PollableChannel) context.getBean("replyChannel"); context.start(); StringMessage message = new StringMessage("*"); channel4.send(message); @@ -81,7 +82,7 @@ public class ReturnAddressTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "returnAddressTests.xml", this.getClass()); MessageChannel channel3 = (MessageChannel) context.getBean("channel3"); - MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel"); + PollableChannel errorChannel = (PollableChannel) context.getBean("customErrorChannel"); context.start(); StringMessage message = new StringMessage("*"); channel3.send(message); @@ -94,7 +95,7 @@ public class ReturnAddressTests { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "returnAddressTests.xml", this.getClass()); MessageChannel channel3 = (MessageChannel) context.getBean("channel3WithOverride"); - MessageChannel errorChannel = (MessageChannel) context.getBean("customErrorChannel"); + PollableChannel errorChannel = (PollableChannel) context.getBean("customErrorChannel"); context.start(); StringMessage message = new StringMessage("*"); channel3.send(message); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java index bfba29075f..f54bda1033 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java @@ -26,7 +26,7 @@ import org.junit.Test; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.PollableSource; /** * @author Mark Fisher @@ -47,7 +47,7 @@ public class SourceEndpointTests { } - private static class TestSource implements MessageSource { + private static class TestSource implements PollableSource { private String message; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptorTests.java index 067aa54c8c..4a3e0bae62 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptorTests.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -44,7 +45,7 @@ public class TransactionInterceptorTests { "transactionInterceptorTests.xml", this.getClass()); TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager"); MessageChannel input = (MessageChannel) context.getBean("goodInput"); - MessageChannel output = (MessageChannel) context.getBean("output"); + PollableChannel output = (PollableChannel) context.getBean("output"); assertEquals(0, txManager.getCommitCount()); assertEquals(0, txManager.getRollbackCount()); input.send(new StringMessage("test")); @@ -61,7 +62,7 @@ public class TransactionInterceptorTests { "transactionInterceptorTests.xml", this.getClass()); TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager"); MessageChannel input = (MessageChannel) context.getBean("badInput"); - MessageChannel output = (MessageChannel) context.getBean("output"); + PollableChannel output = (PollableChannel) context.getBean("output"); assertEquals(0, txManager.getCommitCount()); assertEquals(0, txManager.getRollbackCount()); input.send(new StringMessage("test")); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java index 590696af3e..acb6ad3d9c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java @@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; @@ -41,7 +41,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testRequestReplyWithAnonymousChannel() throws Exception { - final MessageChannel requestChannel = new QueueChannel(); + QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); proxyFactory.setRequestChannel(requestChannel); @@ -54,7 +54,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testOneWay() throws Exception { - final MessageChannel requestChannel = new QueueChannel(); + final QueueChannel requestChannel = new QueueChannel(); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); proxyFactory.setServiceInterface(TestService.class); proxyFactory.setRequestChannel(requestChannel); @@ -68,7 +68,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testSolicitResponse() throws Exception { - MessageChannel replyChannel = new QueueChannel(); + QueueChannel replyChannel = new QueueChannel(); replyChannel.send(new StringMessage("foo")); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); proxyFactory.setServiceInterface(TestService.class); @@ -82,7 +82,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testRequestReplyWithTypeConversion() throws Exception { - final MessageChannel requestChannel = new QueueChannel(); + final QueueChannel requestChannel = new QueueChannel(); new Thread(new Runnable() { public void run() { Message input = requestChannel.receive(); @@ -155,7 +155,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testMessageAsMethodArgument() throws Exception { - final MessageChannel requestChannel = new QueueChannel(); + QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); proxyFactory.setServiceInterface(TestService.class); @@ -168,7 +168,7 @@ public class GatewayProxyFactoryBeanTests { @Test public void testMessageAsReturnValue() throws Exception { - final MessageChannel requestChannel = new QueueChannel(); + final QueueChannel requestChannel = new QueueChannel(); new Thread(new Runnable() { public void run() { Message input = requestChannel.receive(); @@ -212,7 +212,7 @@ public class GatewayProxyFactoryBeanTests { } - private static void startResponder(final MessageChannel requestChannel) { + private static void startResponder(final PollableChannel requestChannel) { new Thread(new Runnable() { public void run() { Message input = requestChannel.receive(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java index e895ebcb48..bbfda67eab 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java @@ -32,6 +32,7 @@ import org.junit.Test; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageHeaders; @@ -47,7 +48,7 @@ public class SimpleMessagingGatewayTests { private MessageChannel requestChannel = createMock(MessageChannel.class); - private MessageChannel replyChannel = createMock(MessageChannel.class); + private PollableChannel replyChannel = createMock(PollableChannel.class); private Message messageMock = createMock(Message.class); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/GatewayParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/GatewayParserTests.java index 5f02688fe4..c802744214 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/GatewayParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/config/GatewayParserTests.java @@ -25,6 +25,7 @@ import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.gateway.TestService; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -40,7 +41,7 @@ public class GatewayParserTests { ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass()); TestService service = (TestService) context.getBean("oneWay"); service.oneWay("foo"); - MessageChannel channel = (MessageChannel) context.getBean("requestChannel"); + PollableChannel channel = (PollableChannel) context.getBean("requestChannel"); Message result = channel.receive(1000); assertEquals("foo", result.getPayload()); } @@ -48,7 +49,7 @@ public class GatewayParserTests { @Test public void testSolicitResponse() { ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass()); - MessageChannel channel = (MessageChannel) context.getBean("replyChannel"); + PollableChannel channel = (PollableChannel) context.getBean("replyChannel"); channel.send(new StringMessage("foo")); TestService service = (TestService) context.getBean("solicitResponse"); String result = service.solicitResponse(); @@ -58,7 +59,7 @@ public class GatewayParserTests { @Test public void testRequestReply() { ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass()); - MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel"); + PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel"); MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); this.startResponder(requestChannel, replyChannel); TestService service = (TestService) context.getBean("requestReply"); @@ -69,7 +70,7 @@ public class GatewayParserTests { @Test public void testRequestReplyWithMessageMapper() { ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass()); - MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel"); + PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel"); MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); this.startResponder(requestChannel, replyChannel); TestService service = (TestService) context.getBean("requestReplyWithMessageMapper"); @@ -80,7 +81,7 @@ public class GatewayParserTests { @Test public void testRequestReplyWithMessageCreator() { ApplicationContext context = new ClassPathXmlApplicationContext("gatewayParserTests.xml", this.getClass()); - MessageChannel requestChannel = (MessageChannel) context.getBean("requestChannel"); + PollableChannel requestChannel = (PollableChannel) context.getBean("requestChannel"); MessageChannel replyChannel = (MessageChannel) context.getBean("replyChannel"); this.startResponder(requestChannel, replyChannel); TestService service = (TestService) context.getBean("requestReplyWithMessageCreator"); @@ -89,7 +90,7 @@ public class GatewayParserTests { } - private void startResponder(final MessageChannel requestChannel, final MessageChannel replyChannel) { + private void startResponder(final PollableChannel requestChannel, final MessageChannel replyChannel) { Executors.newSingleThreadExecutor().execute(new Runnable() { public void run() { Message request = requestChannel.receive(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java index 295b1d4178..43643ca705 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java @@ -23,7 +23,6 @@ import org.junit.Test; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.DefaultChannelRegistry; -import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; @@ -117,7 +116,7 @@ public class CorrelationIdTests { adapter.setObject(new TestBean()); adapter.setMethodName("upperCase"); adapter.afterPropertiesSet(); - MessageChannel testChannel = new QueueChannel(); + QueueChannel testChannel = new QueueChannel(); ChannelRegistry channelRegistry = new DefaultChannelRegistry(); channelRegistry.registerChannel("testChannel", testChannel); SplitterMessageHandlerAdapter splitter = new SplitterMessageHandlerAdapter( diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java index 84456c5404..1405c92ef9 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java @@ -38,15 +38,15 @@ import org.springframework.integration.message.StringMessage; */ public class RootCauseErrorMessageRouterTests { - private MessageChannel illegalArgumentChannel = new QueueChannel(); + private QueueChannel illegalArgumentChannel = new QueueChannel(); - private MessageChannel runtimeExceptionChannel = new QueueChannel(); + private QueueChannel runtimeExceptionChannel = new QueueChannel(); - private MessageChannel messageHandlingExceptionChannel = new QueueChannel(); + private QueueChannel messageHandlingExceptionChannel = new QueueChannel(); - private MessageChannel messageDeliveryExceptionChannel = new QueueChannel(); + private QueueChannel messageDeliveryExceptionChannel = new QueueChannel(); - private MessageChannel defaultChannel = new QueueChannel(); + private QueueChannel defaultChannel = new QueueChannel(); @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/RouterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/RouterParserTests.java index e41af39ea2..566d4ea1d1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/RouterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/RouterParserTests.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -37,8 +38,8 @@ public class RouterParserTests { "routerParserTests.xml", this.getClass()); context.start(); MessageChannel input = (MessageChannel) context.getBean("input"); - MessageChannel output1 = (MessageChannel) context.getBean("output1"); - MessageChannel output2 = (MessageChannel) context.getBean("output2"); + PollableChannel output1 = (PollableChannel) context.getBean("output1"); + PollableChannel output2 = (PollableChannel) context.getBean("output2"); input.send(new StringMessage("1")); Message result1 = output1.receive(1000); assertEquals("1", result1.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterAggregatorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterAggregatorTests.java index 6bef057a66..902b9ef18d 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterAggregatorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterAggregatorTests.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -44,7 +45,7 @@ public class SplitterAggregatorTests { ApplicationContext context = new ClassPathXmlApplicationContext( "splitterAggregatorTests.xml", this.getClass()); MessageChannel inputChannel = (MessageChannel) context.getBean("numbers"); - MessageChannel outputChannel = (MessageChannel) context.getBean("results"); + PollableChannel outputChannel = (PollableChannel) context.getBean("results"); inputChannel.send(new GenericMessage(this.nextTen())); Message result1 = outputChannel.receive(1000); assertNotNull(result1); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterParserTests.java index c8563543b4..2c2f6fcd94 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/config/SplitterParserTests.java @@ -23,6 +23,7 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -37,7 +38,7 @@ public class SplitterParserTests { "splitterParserTests.xml", this.getClass()); context.start(); MessageChannel channel1 = (MessageChannel) context.getBean("channel1"); - MessageChannel channel2 = (MessageChannel) context.getBean("channel2"); + PollableChannel channel2 = (PollableChannel) context.getBean("channel2"); channel1.send(new StringMessage("this.is.a.test")); Message result1 = channel2.receive(1000); assertEquals("this", result1.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/transformer/MessageTransformingChannelInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/transformer/MessageTransformingChannelInterceptorTests.java index 1d97c9fa59..a0fa1e17bb 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/transformer/MessageTransformingChannelInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/transformer/MessageTransformingChannelInterceptorTests.java @@ -21,7 +21,6 @@ import static org.junit.Assert.*; import org.junit.Before; import org.junit.Test; -import org.springframework.integration.channel.AbstractMessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; @@ -32,7 +31,7 @@ import org.springframework.integration.message.StringMessage; */ public class MessageTransformingChannelInterceptorTests { - private AbstractMessageChannel channel; + private QueueChannel channel; private StringMessage message;