From 35e744e60a6700ca3fd1765c4ca9635eb1841e5b Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Sun, 7 Sep 2008 21:04:50 +0000 Subject: [PATCH] Added AbstractMessageConsumingEndpoint. MessageDispatchers now expect MessageConsumer instances as subscribers, and the MessageEndpoint no longer has a send() method or a getSource() method. All consumer endpoints now use 'inputChannel' as the property (instead of source). The MessageBus is less involved in endpoint activation now, since endpoints that need to poll a channel can create, configure, and schedule their own poller. --- .../adapter/stream/ByteStreamTarget.java | 9 +- .../adapter/stream/CharacterStreamTarget.java | 9 +- .../stream/config/ConsoleTargetParser.java | 4 +- .../adapter/stream/ByteStreamTargetTests.java | 4 +- .../stream/CharacterStreamTargetTests.java | 2 +- .../config/ConsoleTargetParserTests.java | 8 +- .../ws/config/WebServiceHandlerParser.java | 2 +- .../AbstractMessageBarrierEndpoint.java | 3 +- .../integration/bus/DefaultMessageBus.java | 95 +++++------ .../integration/channel/DirectChannel.java | 9 +- .../channel/PublishSubscribeChannel.java | 10 +- .../config/AbstractEndpointParser.java | 11 +- .../config/ChannelAdapterParser.java | 29 ++-- .../config/IntegrationNamespaceUtils.java | 83 ++++------ .../integration/config/ResequencerParser.java | 2 +- ...AbstractMethodAnnotationPostProcessor.java | 32 ++-- ...ChannelAdapterAnnotationPostProcessor.java | 20 +-- .../dispatcher/AbstractDispatcher.java | 24 +-- .../dispatcher/BroadcastingDispatcher.java | 10 +- .../dispatcher/SimpleDispatcher.java | 29 ++-- .../endpoint/AbstractEndpoint.java | 85 +++++----- .../endpoint/AbstractInOutEndpoint.java | 11 +- .../AbstractMessageConsumingEndpoint.java | 147 ++++++++++++++++++ .../integration/endpoint/AbstractPoller.java | 22 +-- .../integration/endpoint/ChannelPoller.java | 9 +- .../endpoint/InboundChannelAdapter.java | 83 +++++++--- .../integration/endpoint/MessageEndpoint.java | 6 - .../endpoint/OutboundChannelAdapter.java | 9 +- .../endpoint/ServiceActivatorEndpoint.java | 1 + .../integration/endpoint/SourcePoller.java | 37 +++-- .../gateway/SimpleMessagingGateway.java | 2 +- .../message/SubscribableSource.java | 10 +- .../integration/router/RouterEndpoint.java | 7 +- .../bus/DefaultMessageBusTests.java | 20 +-- .../bus/DirectChannelSubscriptionTests.java | 4 +- .../integration/bus/messageBusTests.xml | 2 +- .../channel/DirectChannelTests.java | 18 +-- .../config/TestSubscribableSource.java | 16 +- .../config/AggregatorParserTests.java | 12 +- .../config/EndpointParserTests.java | 11 +- ...MessagingAnnotationPostProcessorTests.java | 21 +-- .../BroadcastingDispatcherTests.java | 93 ++++++----- .../dispatcher/SimpleDispatcherTests.java | 108 ++++++++----- .../endpoint/ChannelPollerTests.java | 32 ++-- .../endpoint/MessagingBridgeTests.java | 64 -------- .../ServiceActivatorEndpointTests.java | 36 ++--- .../handler/CorrelationIdTests.java | 22 +-- .../handler/MessageFilterTests.java | 8 +- .../handler/MethodInvokingTargetTests.java | 2 +- .../message/MessageExchangeTemplateTests.java | 2 +- .../router/MethodInvokingRouterTests.java | 64 ++++---- .../router/MultiChannelRouterTests.java | 8 +- .../router/PayloadTypeRouterTests.java | 8 +- .../router/RecipientListRouterTests.java | 6 +- .../RootCauseErrorMessageRouterTests.java | 14 +- .../router/RouterEndpointTests.java | 22 ++- .../router/SingleChannelRouterTests.java | 11 +- 57 files changed, 758 insertions(+), 670 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java delete mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java index 0828ebe74f..641c693a8c 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/ByteStreamTarget.java @@ -23,7 +23,7 @@ import java.io.OutputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessagingException; @@ -32,7 +32,7 @@ import org.springframework.integration.message.MessagingException; * * @author Mark Fisher */ -public class ByteStreamTarget extends AbstractEndpoint { +public class ByteStreamTarget extends AbstractMessageConsumingEndpoint { private final Log logger = LogFactory.getLog(this.getClass()); @@ -54,13 +54,13 @@ public class ByteStreamTarget extends AbstractEndpoint { @Override - public boolean sendInternal(Message message) { + public void processMessage(Message message) { Object payload = message.getPayload(); if (payload == null) { if (logger.isWarnEnabled()) { logger.warn(this.getClass().getSimpleName() + " received null object"); } - return false; + return; } try { if (payload instanceof String) { @@ -74,7 +74,6 @@ public class ByteStreamTarget extends AbstractEndpoint { " only supports byte array and String-based messages"); } this.stream.flush(); - return true; } catch (IOException e) { throw new MessagingException("IO failure occurred in target", e); diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java index ccd310f83d..a5584c301c 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/CharacterStreamTarget.java @@ -27,7 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.integration.ConfigurationException; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessagingException; import org.springframework.util.Assert; @@ -41,7 +41,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class CharacterStreamTarget extends AbstractEndpoint { +public class CharacterStreamTarget extends AbstractMessageConsumingEndpoint { private final Log logger = LogFactory.getLog(this.getClass()); @@ -118,13 +118,13 @@ public class CharacterStreamTarget extends AbstractEndpoint { } @Override - public boolean sendInternal(Message message) { + public void processMessage(Message message) { Object payload = message.getPayload(); if (payload == null) { if (logger.isWarnEnabled()) { logger.warn("target received null payload"); } - return false; + return; } try { if (payload instanceof String) { @@ -143,7 +143,6 @@ public class CharacterStreamTarget extends AbstractEndpoint { writer.newLine(); } writer.flush(); - return true; } catch (IOException e) { throw new MessagingException("IO failure occurred in target", e); diff --git a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java index 13550aea0d..464e63af73 100644 --- a/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java +++ b/org.springframework.integration.adapter/src/main/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParser.java @@ -72,10 +72,10 @@ public class ConsoleTargetParser extends AbstractSingleBeanDefinitionParser { } String channelName = element.getAttribute("channel"); if (StringUtils.hasText(channelName)) { - builder.addPropertyReference("source", channelName); + builder.addPropertyReference("inputChannel", channelName); } else { - builder.addPropertyReference("source", this.createDirectChannel(element, parserContext)); + builder.addPropertyReference("inputChannel", this.createDirectChannel(element, parserContext)); } } diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java index 86914b5045..cb26328b24 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamTargetTests.java @@ -51,7 +51,7 @@ public class ByteStreamTargetTests { public void testSingleByteArray() { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); - target.send(new GenericMessage(new byte[] {1,2,3})); + target.onMessage(new GenericMessage(new byte[] {1,2,3})); byte[] result = stream.toByteArray(); assertEquals(3, result.length); assertEquals(1, result[0]); @@ -63,7 +63,7 @@ public class ByteStreamTargetTests { public void testSingleString() { ByteArrayOutputStream stream = new ByteArrayOutputStream(); ByteStreamTarget target = new ByteStreamTarget(stream); - target.send(new StringMessage("foo")); + target.onMessage(new StringMessage("foo")); byte[] result = stream.toByteArray(); assertEquals(3, result.length); assertEquals("foo", new String(result)); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java index 4b5a08d7db..5199618ac0 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamTargetTests.java @@ -50,7 +50,7 @@ public class CharacterStreamTargetTests { public void testSingleString() { StringWriter writer = new StringWriter(); CharacterStreamTarget target = new CharacterStreamTarget(writer); - target.send(new StringMessage("foo")); + target.onMessage(new StringMessage("foo")); assertEquals("foo", writer.toString()); } diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParserTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParserTests.java index 66c82db6fe..67caf3843e 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParserTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/config/ConsoleTargetParserTests.java @@ -72,7 +72,7 @@ public class ConsoleTargetParserTests { Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding()); assertEquals(Charset.defaultCharset(), writerCharset); this.resetStreams(); - target.send(new StringMessage("foo")); + target.onMessage(new StringMessage("foo")); assertEquals("foo", out.toString()); assertEquals("", err.toString()); } @@ -92,7 +92,7 @@ public class ConsoleTargetParserTests { Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding()); assertEquals(Charset.forName("UTF-8"), writerCharset); this.resetStreams(); - target.send(new StringMessage("bar")); + target.onMessage(new StringMessage("bar")); assertEquals("bar", out.toString()); assertEquals("", err.toString()); } @@ -128,7 +128,7 @@ public class ConsoleTargetParserTests { Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding()); assertEquals(Charset.defaultCharset(), writerCharset); this.resetStreams(); - target.send(new StringMessage("bad")); + target.onMessage(new StringMessage("bad")); assertEquals("", out.toString()); assertEquals("bad", err.toString()); } @@ -148,7 +148,7 @@ public class ConsoleTargetParserTests { Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding()); assertEquals(Charset.defaultCharset(), writerCharset); this.resetStreams(); - target.send(new StringMessage("foo")); + target.onMessage(new StringMessage("foo")); assertEquals("foo\n", out.toString()); } diff --git a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java index d288797bff..380e022aec 100644 --- a/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java +++ b/org.springframework.integration.ws/src/main/java/org/springframework/integration/ws/config/WebServiceHandlerParser.java @@ -57,7 +57,7 @@ public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser } builder.addConstructorArgValue(uri); String inputChannel = element.getAttribute("input-channel"); - builder.addPropertyReference("source", inputChannel); + builder.addPropertyReference("inputChannel", inputChannel); String outputChannel = element.getAttribute("output-channel"); if (StringUtils.hasText(outputChannel)) { builder.addPropertyReference("outputChannel", outputChannel); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java index 4f38f627bc..e7915b44a4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/aggregator/AbstractMessageBarrierEndpoint.java @@ -151,7 +151,8 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractInOutEndpoi * Initialize this endpoint. */ @Override - protected void initialize() { + protected void initialize() throws Exception { + super.initialize(); this.trackedCorrelationIds = new ArrayBlockingQueue(this.trackedCorrelationIdCapacity); this.executor.scheduleWithFixedDelay(new ReaperTask(), this.reaperInterval, this.reaperInterval, TimeUnit.MILLISECONDS); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index e1653b3db4..14a2cbb288 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -44,18 +44,12 @@ import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.MessagePublishingErrorHandler; -import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.AbstractPoller; -import org.springframework.integration.endpoint.ChannelPoller; import org.springframework.integration.endpoint.DefaultEndpointRegistry; import org.springframework.integration.endpoint.EndpointRegistry; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.MessagingGateway; -import org.springframework.integration.message.MessageSource; -import org.springframework.integration.message.SubscribableSource; -import org.springframework.integration.scheduling.PollingSchedule; -import org.springframework.integration.scheduling.Schedule; import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.integration.scheduling.spi.ProviderTaskScheduler; import org.springframework.integration.scheduling.spi.SimpleScheduleServiceProvider; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; @@ -78,14 +72,10 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A private final EndpointRegistry endpointRegistry = new DefaultEndpointRegistry(); - private final Set pollers = new CopyOnWriteArraySet(); - - private volatile Schedule defaultPollerSchedule = new PollingSchedule(0); - - private final List lifecycleEndpoints = new CopyOnWriteArrayList(); - private final MessageBusInterceptorsList interceptors = new MessageBusInterceptorsList(); + private final Set lifecycleGateways = new CopyOnWriteArraySet(); + private volatile TaskScheduler taskScheduler; private volatile ApplicationContext applicationContext; @@ -263,39 +253,46 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } } + private void deactivateEndpoints() { + Set endpointNames = this.endpointRegistry.getEndpointNames(); + for (String name : endpointNames) { + MessageEndpoint endpoint = this.endpointRegistry.lookupEndpoint(name); + if (endpoint != null) { + this.deactivateEndpoint(endpoint); + } + } + } + private void activateEndpoint(MessageEndpoint endpoint) { Assert.notNull(endpoint, "'endpoint' must not be null"); if (endpoint instanceof ChannelRegistryAware) { ((ChannelRegistryAware) endpoint).setChannelRegistry(this); } - MessageSource source = endpoint.getSource(); - if (source == null) { - throw new ConfigurationException("endpoint '" + endpoint + "' has no source"); + if (endpoint instanceof TaskSchedulerAware) { + ((TaskSchedulerAware) endpoint).setTaskScheduler(this.taskScheduler); } - if (source instanceof SubscribableSource) { - ((SubscribableSource) source).subscribe(endpoint); - if (source instanceof AbstractPoller) { - AbstractPoller poller = (AbstractPoller) source; - this.pollers.add(poller); - this.taskScheduler.schedule(poller); - } - return; - } - else if (source instanceof PollableChannel) { - ChannelPoller poller = new ChannelPoller((PollableChannel) source, this.defaultPollerSchedule); - poller.subscribe(endpoint); - this.pollers.add(poller); - this.taskScheduler.schedule(poller); + if (endpoint instanceof Lifecycle) { + ((Lifecycle) endpoint).start(); } if (logger.isInfoEnabled()) { - logger.info("activated subscription to channel '" - + source + "' for endpoint '" + endpoint + "'"); + logger.info("activated endpoint '" + endpoint + "'"); } } + public void deactivateEndpoint(MessageEndpoint endpoint) { + Assert.notNull(endpoint, "'endpoint' must not be null"); + if (endpoint instanceof Lifecycle) { + ((Lifecycle) endpoint).stop(); + if (this.logger.isInfoEnabled()) { + logger.info("deactivated endpoint '" + endpoint + "'"); + } + } + } + + // TODO: once gateways are endpoints, remove this private void registerGateway(String name, MessagingGateway gateway) { if (gateway instanceof Lifecycle) { - this.lifecycleEndpoints.add((Lifecycle) gateway); + this.lifecycleGateways.add((Lifecycle) gateway); if (this.isRunning()) { ((Lifecycle) gateway).start(); } @@ -305,19 +302,6 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } } - public void deactivateEndpoint(MessageEndpoint endpoint) { - Assert.notNull(endpoint, "'endpoint' must not be null"); - for (AbstractPoller poller : this.pollers) { - boolean removed = ((AbstractPoller) poller).unsubscribe(endpoint); - if (removed && this.logger.isInfoEnabled()) { - logger.info("unsubscribed endpoint '" + endpoint + "' from poller '" + poller + "'"); - } - } - if (endpoint instanceof Lifecycle) { - ((Lifecycle) endpoint).stop(); - } - } - public boolean isRunning() { synchronized (this.lifecycleMonitor) { return this.running; @@ -335,14 +319,11 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A this.starting = true; synchronized (this.lifecycleMonitor) { this.activateEndpoints(); + for (Lifecycle gateway : this.lifecycleGateways) { + gateway.start(); + } this.taskScheduler.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel())); this.taskScheduler.start(); - for (Lifecycle endpoint : this.lifecycleEndpoints) { - endpoint.start(); - if (logger.isInfoEnabled()) { - logger.info("started endpoint '" + endpoint + "'"); - } - } } this.running = true; this.starting = false; @@ -358,14 +339,12 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A } this.interceptors.preStop(); synchronized (this.lifecycleMonitor) { + this.deactivateEndpoints(); + for (Lifecycle gateway : this.lifecycleGateways) { + gateway.stop(); + } this.running = false; this.taskScheduler.stop(); - for (Lifecycle endpoint : this.lifecycleEndpoints) { - endpoint.stop(); - if (logger.isInfoEnabled()) { - logger.info("stopped endpoint '" + endpoint + "'"); - } - } } this.interceptors.postStop(); if (logger.isInfoEnabled()) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java index ffd19390d5..6f1c490ced 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/DirectChannel.java @@ -19,6 +19,7 @@ package org.springframework.integration.channel; import org.springframework.integration.dispatcher.SimpleDispatcher; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.SubscribableSource; /** @@ -33,12 +34,12 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl private final SimpleDispatcher dispatcher = new SimpleDispatcher(); - public boolean subscribe(MessageEndpoint endpoint) { - return this.dispatcher.subscribe(endpoint); + public boolean subscribe(MessageConsumer consumer) { + return this.dispatcher.subscribe(consumer); } - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.dispatcher.unsubscribe(endpoint); + public boolean unsubscribe(MessageConsumer consumer) { + return this.dispatcher.unsubscribe(consumer); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java index 20c8f2e096..3f84e7e658 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java @@ -18,8 +18,8 @@ package org.springframework.integration.channel; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.dispatcher.BroadcastingDispatcher; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.SubscribableSource; /** @@ -48,12 +48,12 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S this.dispatcher.setApplySequence(applySequence); } - public boolean subscribe(MessageEndpoint endpoint) { - return this.dispatcher.subscribe(endpoint); + public boolean subscribe(MessageConsumer consumer) { + return this.dispatcher.subscribe(consumer); } - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.dispatcher.unsubscribe(endpoint); + public boolean unsubscribe(MessageConsumer consumer) { + return this.dispatcher.unsubscribe(consumer); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java index 4b1f09cb53..d190327066 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractEndpointParser.java @@ -89,12 +89,13 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio } Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); if (pollerElement != null) { - String pollerBeanName = IntegrationNamespaceUtils.parseChannelPoller(inputChannel, pollerElement, parserContext); - builder.addPropertyReference("source", pollerBeanName); - } - else { - builder.addPropertyReference("source", inputChannel); + IntegrationNamespaceUtils.configureSchedule(pollerElement, builder); + Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); + if (txElement != null) { + IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder); + } } + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, SELECTOR_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ChannelAdapterParser.java index 82df1b26e5..52a82ac0ee 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ChannelAdapterParser.java @@ -73,19 +73,21 @@ public class ChannelAdapterParser extends AbstractBeanDefinitionParser { source = BeanDefinitionReaderUtils.registerWithGeneratedName(invokerBuilder.getBeanDefinition(), parserContext.getRegistry()); } adapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(InboundChannelAdapter.class); - if (pollerElement != null) { - String pollerBeanName = IntegrationNamespaceUtils.parseSourcePoller(source, pollerElement, parserContext); - adapterBuilder.addPropertyReference("source", pollerBeanName); - } - else { - adapterBuilder.addPropertyReference("source", source); - } + adapterBuilder.addPropertyReference("source", source); if (StringUtils.hasText(channelName)) { adapterBuilder.addPropertyReference("channel", channelName); } else { adapterBuilder.addPropertyReference("channel", this.createDirectChannel(element, parserContext)); } + if (pollerElement != null) { + IntegrationNamespaceUtils.configureSchedule(pollerElement, adapterBuilder); + IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, pollerElement, "max-messages-per-poll"); + Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); + if (txElement != null) { + IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder); + } + } } else if (StringUtils.hasText(target)) { if (StringUtils.hasText(methodName)) { @@ -100,14 +102,17 @@ public class ChannelAdapterParser extends AbstractBeanDefinitionParser { if (!StringUtils.hasText(channelName)) { throw new ConfigurationException("outbound channel-adapter with a 'poller' requires a 'channel' to poll"); } - String pollerBeanName = IntegrationNamespaceUtils.parseChannelPoller(channelName, pollerElement, parserContext); - adapterBuilder.addPropertyReference("source", pollerBeanName); + IntegrationNamespaceUtils.configureSchedule(pollerElement, adapterBuilder); + Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); + if (txElement != null) { + IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder); + } } - else if (StringUtils.hasText(channelName)) { - adapterBuilder.addPropertyReference("source", channelName); + if (StringUtils.hasText(channelName)) { + adapterBuilder.addPropertyReference("inputChannel", channelName); } else { - adapterBuilder.addPropertyReference("source", + adapterBuilder.addPropertyReference("inputChannel", this.createDirectChannel(element, parserContext)); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java index 3dcdae7572..e505a1d1f5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java @@ -21,19 +21,15 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.core.Conventions; import org.springframework.integration.ConfigurationException; -import org.springframework.integration.endpoint.ChannelPoller; -import org.springframework.integration.endpoint.SourcePoller; import org.springframework.integration.scheduling.CronSchedule; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.integration.scheduling.Schedule; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.StringUtils; -import org.springframework.util.xml.DomUtils; /** * Shared utility methods for integration namespace parsers. @@ -138,71 +134,54 @@ public abstract class IntegrationNamespaceUtils { } /** - * Parse a "poller" element to create a ChannelPoller and return the bean name of the poller instance. + * Parse a "poller" element to create a Schedule and add it to the property values of the target builder. * - * @param channelBeanName the name of the PollableChannel bean - * @param element the "poller" element to parse - * @param parserContext the parserContext for registering a newly created bean definition - * @return the name of the ChannelPoller bean definition + * @param pollerElement the "poller" element to parse + * @param targetBuilder the builder that expects the "schedule" property */ - public static String parseChannelPoller(String channelBeanName, Element element, ParserContext parserContext) { - return parsePoller(channelBeanName, element, parserContext, true); - } - - /** - * Parse a "poller" element to create a SourcePoller and return the bean name of the poller instance. - * - * @param sourceBeanName the name of the PollableSource bean - * @param element the "poller" element to parse - * @param parserContext the parserContext for registering a newly created bean definition - * @return the name of the poller bean definition - */ - public static String parseSourcePoller(String sourceBeanName, Element element, ParserContext parserContext) { - return parsePoller(sourceBeanName, element, parserContext, false); - } - - private static String parsePoller(String sourceBeanName, Element element, ParserContext parserContext, boolean isChannel) { - Class beanClass = isChannel ? ChannelPoller.class : SourcePoller.class; - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClass); + public static void configureSchedule(Element pollerElement, BeanDefinitionBuilder targetBuilder) { Schedule schedule = null; - if (!(StringUtils.hasText(element.getAttribute("period")) ^ StringUtils.hasText(element.getAttribute("cron")))) { + if (!(StringUtils.hasText(pollerElement.getAttribute("period")) ^ StringUtils.hasText(pollerElement.getAttribute("cron")))) { throw new ConfigurationException("A element must define either a period " + "or a cron expression (but not both)"); } - if (StringUtils.hasText(element.getAttribute("period"))) { - Long period = Long.valueOf(element.getAttribute("period")); + if (StringUtils.hasText(pollerElement.getAttribute("period"))) { + Long period = Long.valueOf(pollerElement.getAttribute("period")); schedule = new PollingSchedule(period); - String initialDelay = element.getAttribute("initial-delay"); + String initialDelay = pollerElement.getAttribute("initial-delay"); if (StringUtils.hasText(initialDelay)) { ((PollingSchedule)schedule).setInitialDelay(Long.valueOf(initialDelay)); } - if ("true".equals(element.getAttribute("fixed-rate").toLowerCase())) { + if ("true".equals(pollerElement.getAttribute("fixed-rate").toLowerCase())) { ((PollingSchedule)schedule).setFixedRate(true); } else { ((PollingSchedule)schedule).setFixedRate(false); } } - if (StringUtils.hasText(element.getAttribute("cron"))) { - schedule = new CronSchedule(element.getAttribute("cron")); + if (StringUtils.hasText(pollerElement.getAttribute("cron"))) { + schedule = new CronSchedule(pollerElement.getAttribute("cron")); } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor"); - Element txElement = DomUtils.getChildElementByTagName(element, "transactional"); - if (txElement != null) { - builder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager")); - builder.addPropertyValue("propagationBehaviorName", - DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation")); - builder.addPropertyValue("isolationLevelName", - DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation")); - builder.addPropertyValue("transactionTimeout", txElement.getAttribute("timeout")); - builder.addPropertyValue("transactionReadOnly", txElement.getAttribute("read-only")); - } - builder.addConstructorArgReference(sourceBeanName); - builder.addConstructorArgValue(schedule); - setValueIfAttributeDefined(builder, element, "receive-timeout"); - setValueIfAttributeDefined(builder, element, "send-timeout"); - setValueIfAttributeDefined(builder, element, "max-messages-per-poll"); - return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); + targetBuilder.addPropertyValue("schedule", schedule); + } + + /** + * Parse a "transactional" element and configure the "transactionManager" and "transactionDefinition" + * properties for the target builder. + * + * @param txElement the "transactional" element to parse + * @param targetBuilder the builder that expects the "transactionManager" and "transactionDefinition" properties + */ + public static void configureTransactionAttributes(Element txElement, BeanDefinitionBuilder targetBuilder) { + targetBuilder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager")); + DefaultTransactionDefinition txDefinition = new DefaultTransactionDefinition(); + txDefinition.setPropagationBehaviorName( + DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation")); + txDefinition.setIsolationLevelName( + DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation")); + txDefinition.setTimeout(Integer.valueOf(txElement.getAttribute("timeout"))); + txDefinition.setReadOnly(txElement.getAttribute("read-only").equalsIgnoreCase("true")); + targetBuilder.addPropertyValue("transactionDefinition", txDefinition); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java index c10bf279e9..b6f2e529ba 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ResequencerParser.java @@ -51,7 +51,7 @@ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser { @Override protected void postProcess(BeanDefinitionBuilder builder, Element element) { - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE, "source"); + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 1f44446d21..9c501960a3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -28,7 +28,7 @@ import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.AbstractInOutEndpoint; -import org.springframework.integration.endpoint.ChannelPoller; +import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; @@ -87,22 +87,22 @@ public abstract class AbstractMethodAnnotationPostProcessor endpoints = new CopyOnWriteArraySet(); + protected final Set subscribers = new CopyOnWriteArraySet(); private volatile TaskExecutor taskExecutor; - public boolean subscribe(MessageEndpoint endpoint) { - return this.endpoints.add(endpoint); + public boolean subscribe(MessageConsumer consumer) { + return this.subscribers.add(consumer); } - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.endpoints.remove(endpoint); + public boolean unsubscribe(MessageConsumer consumer) { + return this.subscribers.remove(consumer); } /** - * Specify a {@link TaskExecutor} for invoking the endpoints. + * Specify a {@link TaskExecutor} for invoking the consumers. * If none is provided, the invocation will occur in the thread * that runs this polling dispatcher. */ @@ -61,15 +60,8 @@ public abstract class AbstractDispatcher implements MessageDispatcher { return this.taskExecutor; } - /** - * A convenience method for subclasses to send a Message to a single endpoint. - */ - protected final boolean sendMessageToEndpoint(Message message, MessageEndpoint endpoint) { - return endpoint.send(message); - } - public String toString() { - return this.getClass().getSimpleName() + " with endpoints: " + this.endpoints; + return this.getClass().getSimpleName() + " with subscribers: " + this.subscribers; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index ad38e0c18e..044ea55bd7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -17,9 +17,9 @@ package org.springframework.integration.dispatcher; import org.springframework.core.task.TaskExecutor; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageConsumer; /** * A broadcasting dispatcher implementation. It makes a best effort to @@ -45,8 +45,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher { public boolean dispatch(Message message) { int sequenceNumber = 1; - int sequenceSize = this.endpoints.size(); - for (final MessageEndpoint endpoint : this.endpoints) { + int sequenceSize = this.subscribers.size(); + for (final MessageConsumer consumer : this.subscribers) { final Message messageToSend = (!this.applySequence) ? message : MessageBuilder.fromMessage(message) .setSequenceNumber(sequenceNumber++) @@ -56,12 +56,12 @@ public class BroadcastingDispatcher extends AbstractDispatcher { if (executor != null) { executor.execute(new Runnable() { public void run() { - sendMessageToEndpoint(messageToSend, endpoint); + consumer.onMessage(messageToSend); } }); } else { - this.sendMessageToEndpoint(messageToSend, endpoint); + consumer.onMessage(messageToSend); } } return true; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java index bfdfd3ecb6..f1608d627d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/dispatcher/SimpleDispatcher.java @@ -16,50 +16,45 @@ package org.springframework.integration.dispatcher; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; /** * Basic implementation of {@link MessageDispatcher} that will attempt - * to send a {@link Message} to one of its endpoints. As soon as one - * of the endpoints accepts the Message, the dispatcher will return 'true'. + * to send a {@link Message} to one of its subscribers. As soon as one + * of the subscribers accepts the Message, the dispatcher will return 'true'. *

- * If the dispatcher has no endpoints, a {@link MessageDeliveryException} - * will be thrown. If all endpoints reject the Message, the dispatcher will - * throw a MessageRejectedException. If all endpoints return 'false' - * (e.g. due to a timeout), the dispatcher will return 'false'. + * If the dispatcher has no subscribers, a {@link MessageDeliveryException} + * will be thrown. If all subscribers reject the Message, the dispatcher will + * throw a MessageRejectedException. * * @author Mark Fisher */ public class SimpleDispatcher extends AbstractDispatcher { public boolean dispatch(Message message) { - if (this.endpoints.size() == 0) { + if (this.subscribers.size() == 0) { throw new MessageDeliveryException(message, "Dispatcher has no subscribers."); } int count = 0; int rejectedExceptionCount = 0; - for (MessageEndpoint endpoint : this.endpoints) { + for (MessageConsumer consumer : this.subscribers) { count++; try { - if (this.sendMessageToEndpoint(message, endpoint)) { - return true; - } - if (logger.isDebugEnabled()) { - logger.debug("Failed to send message to endpoint, continuing with other endpoints if available."); - } + consumer.onMessage(message); + return true; } catch (MessageRejectedException e) { rejectedExceptionCount++; if (logger.isDebugEnabled()) { - logger.debug("Endpoint '" + endpoint + "' rejected Message, continuing with other endpoints if available.", e); + logger.debug("Consumer '" + consumer + "' rejected Message, continuing with other subscribers if available.", e); } } } if (rejectedExceptionCount == count) { - throw new MessageRejectedException(message, "All of dispatcher's endpoints rejected Message."); + throw new MessageRejectedException(message, "All of dispatcher's subscribers rejected Message."); } return false; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 2b0773173a..dac4352330 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -24,31 +24,35 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; -import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageExchangeTemplate; -import org.springframework.integration.message.MessageHandlingException; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessagingException; -import org.springframework.integration.message.SubscribableSource; +import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.integration.util.ErrorHandler; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; /** * The base class for Message Endpoint implementations. * * @author Mark Fisher */ -public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegistryAware, BeanNameAware, InitializingBean { +public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegistryAware, TaskSchedulerAware, BeanNameAware, InitializingBean { protected final Log logger = LogFactory.getLog(this.getClass()); private volatile String name; - private MessageSource source; + private volatile ChannelRegistry channelRegistry; + + private volatile TaskScheduler taskScheduler; + + private volatile PlatformTransactionManager transactionManager; + + private volatile TransactionDefinition transactionDefinition; private volatile ErrorHandler errorHandler; - private volatile ChannelRegistry channelRegistry; - private final MessageExchangeTemplate messageExchangeTemplate = new MessageExchangeTemplate(); @@ -63,14 +67,6 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist this.name = name; } - public MessageSource getSource() { - return this.source; - } - - public void setSource(MessageSource source) { - this.source = source; - } - protected ChannelRegistry getChannelRegistry() { return this.channelRegistry; } @@ -79,6 +75,22 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist this.channelRegistry = channelRegistry; } + protected TaskScheduler getTaskScheduler() { + return this.taskScheduler; + } + + public void setTaskScheduler(TaskScheduler taskScheduler) { + this.taskScheduler = taskScheduler; + } + + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + public void setTransactionDefinition(TransactionDefinition transactionDefinition) { + this.transactionDefinition= transactionDefinition; + } + protected MessageExchangeTemplate getMessageExchangeTemplate() { return this.messageExchangeTemplate; } @@ -94,9 +106,6 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist } public final void afterPropertiesSet() { - if (this.source != null && (this.source instanceof SubscribableSource)) { - ((SubscribableSource) this.source).subscribe(this); - } try { this.initialize(); } @@ -114,31 +123,7 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist protected void initialize() throws Exception { } - public final boolean send(Message message) { - if (message == null || message.getPayload() == null) { - throw new IllegalArgumentException("Message and its payload must not be null"); - } - if (this.logger.isDebugEnabled()) { - this.logger.debug("endpoint '" + this + "' processing message: " + message); - } - try { - return this.sendInternal(message); - } - catch (Exception e) { - if (e instanceof MessagingException) { - this.handleException((MessagingException) e); - } - else { - this.handleException(new MessageHandlingException(message, - "failure occurred in endpoint '" + this.toString() + "'", e)); - } - return false; - } - } - - protected abstract boolean sendInternal(Message message); - - private void handleException(MessagingException exception) { + protected void handleException(MessagingException exception) { if (this.errorHandler == null) { if (this.logger.isWarnEnabled()) { this.logger.warn("exception occurred in endpoint '" + this.name + "'", exception); @@ -148,6 +133,18 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist this.errorHandler.handle(exception); } + protected final void configureTransactionSettingsForPoller(AbstractPoller poller) { + if (this.transactionManager != null) { + poller.setTransactionManager(this.transactionManager); + } + if (this.transactionDefinition != null) { + poller.setPropagationBehavior(this.transactionDefinition.getPropagationBehavior()); + poller.setIsolationLevel(this.transactionDefinition.getIsolationLevel()); + poller.setTransactionReadOnly(this.transactionDefinition.isReadOnly()); + poller.setTransactionTimeout(this.transactionDefinition.getTimeout()); + } + } + public String toString() { return (this.name != null) ? this.name : super.toString(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractInOutEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractInOutEndpoint.java index 9937bcd052..2032673e40 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractInOutEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractInOutEndpoint.java @@ -34,7 +34,7 @@ import org.springframework.integration.message.selector.MessageSelector; /** * @author Mark Fisher */ -public abstract class AbstractInOutEndpoint extends AbstractEndpoint { +public abstract class AbstractInOutEndpoint extends AbstractMessageConsumingEndpoint { private MessageChannel outputChannel; @@ -73,11 +73,11 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint { } @Override - protected boolean sendInternal(Message message) { + protected void processMessage(Message message) { for (EndpointInterceptor interceptor : this.interceptors) { message = interceptor.preHandle(message); if (message == null) { - return false; + return; } } if (!this.supports(message)) { @@ -89,7 +89,7 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint { throw new MessageHandlingException(message, "endpoint '" + this.getName() + " requires a reply, but no reply was received"); } - return true; + return; } Message reply = null; if (result instanceof Message && result.equals(message)) { @@ -106,10 +106,9 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint { boolean sent = this.sendReplyMessage(nextReply, replyChannel); sentAtLeastOne = (sentAtLeastOne || sent); } - return sentAtLeastOne; } else { - return this.sendReplyMessage(reply, replyChannel); + this.sendReplyMessage(reply, replyChannel); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java new file mode 100644 index 0000000000..a20ac41864 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractMessageConsumingEndpoint.java @@ -0,0 +1,147 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import org.springframework.context.Lifecycle; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; +import org.springframework.integration.message.MessageHandlingException; +import org.springframework.integration.message.MessagingException; +import org.springframework.integration.message.SubscribableSource; +import org.springframework.integration.scheduling.PollingSchedule; +import org.springframework.integration.scheduling.Schedule; + +/** + * The base class for Message Endpoint implementations that consume Messages. + * + * @author Mark Fisher + */ +public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint implements MessageConsumer, Lifecycle { + + private volatile MessageChannel inputChannel; + + private volatile Schedule schedule = new PollingSchedule(0); + + private volatile ChannelPoller poller; + + private volatile int maxMessagesPerPoll = -1; + + private volatile boolean initialized; + + private volatile boolean running; + + private final Object lifecycleMonitor = new Object(); + + + public void setInputChannel(MessageChannel inputChannel) { + this.inputChannel = inputChannel; + } + + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + if (this.poller != null) { + this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); + } + } + + public final boolean isRunning() { + return this.running; + } + + @Override + protected void initialize() throws Exception { + synchronized (this.lifecycleMonitor) { + if (this.inputChannel instanceof PollableChannel && this.poller == null) { + this.poller = new ChannelPoller((PollableChannel) this.inputChannel, this.schedule); + this.poller.setMaxMessagesPerPoll(this.maxMessagesPerPoll); + this.configureTransactionSettingsForPoller(this.poller); + this.poller.subscribe(this); + } + this.initialized = true; + } + } + + public final void start() { + synchronized (this.lifecycleMonitor) { + if (this.running) { + return; + } + if (!this.initialized) { + this.afterPropertiesSet(); + } + if (this.inputChannel == null) { + throw new ConfigurationException("failed to start endpoint, inputChannel is required"); + } + if (this.inputChannel instanceof SubscribableSource) { + ((SubscribableSource) inputChannel).subscribe(this); + } + else if (this.inputChannel instanceof PollableChannel) { + if (this.getTaskScheduler() == null) { + throw new ConfigurationException("failed to start endpoint, no taskScheduler available"); + } + this.getTaskScheduler().schedule(poller); + } + this.running = true; + } + } + + public final void stop() { + synchronized (this.lifecycleMonitor) { + if (!this.running) { + return; + } + if (this.inputChannel instanceof SubscribableSource) { + ((SubscribableSource) inputChannel).unsubscribe(this); + } + else if (this.poller != null) { + this.getTaskScheduler().cancel(poller, true); + } + this.running = false; + } + } + + public final void onMessage(Message message) { + if (message == null || message.getPayload() == null) { + throw new IllegalArgumentException("Message and its payload must not be null"); + } + if (this.logger.isDebugEnabled()) { + this.logger.debug("endpoint '" + this + "' processing message: " + message); + } + try { + this.processMessage(message); + } + catch (Exception e) { + if (e instanceof MessagingException) { + this.handleException((MessagingException) e); + } + else { + this.handleException(new MessageHandlingException(message, + "failure occurred in endpoint '" + this.toString() + "'", e)); + } + } + } + + protected abstract void processMessage(Message message); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java index 2594263707..646e1645d7 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPoller.java @@ -18,11 +18,11 @@ package org.springframework.integration.endpoint; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; -import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.SchedulableTask; import org.springframework.integration.scheduling.Schedule; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; @@ -30,7 +30,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public abstract class AbstractPoller implements SubscribableSource, SchedulableTask, InitializingBean { +public abstract class AbstractPoller implements SchedulableTask, InitializingBean { public static final int MAX_MESSAGES_UNBOUNDED = -1; @@ -45,11 +45,11 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT private volatile TransactionTemplate transactionTemplate; - private volatile String propagationBehaviorName = "PROPAGATION_REQUIRED"; + private volatile int propagationBehavior = DefaultTransactionDefinition.PROPAGATION_REQUIRED; - private volatile String isolationLevelName = "ISOLATION_DEFAULT"; + private volatile int isolationLevel = DefaultTransactionDefinition.ISOLATION_DEFAULT; - private volatile int transactionTimeout = -1; + private volatile int transactionTimeout = DefaultTransactionDefinition.TIMEOUT_DEFAULT; private volatile boolean readOnly = false; @@ -94,12 +94,12 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT this.transactionManager = transactionManager; } - public void setPropagationBehaviorName(String propagationBehaviorName) { - this.propagationBehaviorName = propagationBehaviorName; + public void setPropagationBehavior(int propagationBehavior) { + this.propagationBehavior = propagationBehavior; } - public void setIsolationLevelName(String isolationLevelName) { - this.isolationLevelName = isolationLevelName; + public void setIsolationLevel(int isolationLevel) { + this.isolationLevel = isolationLevel; } public void setTransactionTimeout(int transactionTimeout) { @@ -124,8 +124,8 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT } if (this.transactionManager != null) { TransactionTemplate template = new TransactionTemplate(this.transactionManager); - template.setPropagationBehaviorName(this.propagationBehaviorName); - template.setIsolationLevelName(this.isolationLevelName); + template.setPropagationBehavior(this.propagationBehavior); + template.setIsolationLevel(this.isolationLevel); template.setTimeout(this.transactionTimeout); template.setReadOnly(this.readOnly); this.transactionTemplate = template; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java index e1942cbc4d..5669fb3090 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ChannelPoller.java @@ -19,6 +19,7 @@ package org.springframework.integration.endpoint; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.dispatcher.SimpleDispatcher; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.Schedule; import org.springframework.util.Assert; @@ -50,12 +51,12 @@ public class ChannelPoller extends AbstractPoller implements SubscribableSource this.receiveTimeout = receiveTimeout; } - public boolean subscribe(MessageEndpoint endpoint) { - return this.dispatcher.subscribe(endpoint); + public boolean subscribe(MessageConsumer consumer) { + return this.dispatcher.subscribe(consumer); } - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.dispatcher.unsubscribe(endpoint); + public boolean unsubscribe(MessageConsumer consumer) { + return this.dispatcher.unsubscribe(consumer); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/InboundChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/InboundChannelAdapter.java index 738a9f1a8b..a851fa5b27 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/InboundChannelAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/InboundChannelAdapter.java @@ -16,11 +16,12 @@ package org.springframework.integration.endpoint; +import org.springframework.context.Lifecycle; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageDeliveryAware; -import org.springframework.integration.message.MessageDeliveryException; -import org.springframework.integration.message.MessagingException; +import org.springframework.integration.message.MethodInvokingSource; +import org.springframework.integration.message.PollableSource; +import org.springframework.integration.scheduling.Schedule; +import org.springframework.integration.scheduling.TaskScheduler; /** * A Channel Adapter implementation for connecting a @@ -29,33 +30,75 @@ import org.springframework.integration.message.MessagingException; * * @author Mark Fisher */ -public class InboundChannelAdapter extends AbstractEndpoint { +public class InboundChannelAdapter extends AbstractEndpoint implements Lifecycle { - private MessageChannel channel; + private volatile PollableSource source; + private volatile MessageChannel channel; + + private volatile Schedule schedule; + + private volatile SourcePoller poller; + + private volatile int maxMessagesPerPoll = -1; + + private volatile boolean running; + + private final Object lifecycleMonitor = new Object(); + + + public void setSource(PollableSource source) { + this.source = source; + } public void setChannel(MessageChannel channel) { this.channel = channel; } - @Override - protected boolean sendInternal(Message message) { - if (this.channel == null) { - throw new MessageDeliveryException(message, "no channel has been provided"); + public void setSchedule(Schedule schedule) { + this.schedule = schedule; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + if (this.poller != null) { + this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); } - try { - boolean sent = this.getMessageExchangeTemplate().send(message, this.channel); - if (sent && this.getSource() instanceof MessageDeliveryAware) { - ((MessageDeliveryAware) this.getSource()).onSend(message); + } + + public final boolean isRunning() { + return this.running; + } + + public final void start() { + synchronized (this.lifecycleMonitor) { + if (this.running) { + return; + } + this.poller = new SourcePoller(source, channel, schedule); + if (maxMessagesPerPoll < 0 && source instanceof MethodInvokingSource) { + // the default is 1 since a MethodInvokingSource might return a non-null value + // every time it is invoked, thus producing an infinite number of messages per poll + maxMessagesPerPoll = 1; + } + this.configureTransactionSettingsForPoller(this.poller); + this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll); + TaskScheduler taskScheduler = this.getTaskScheduler(); + if (taskScheduler != null) { + taskScheduler.schedule(this.poller); } - return sent; } - catch (Exception e) { - if (this.getSource() instanceof MessageDeliveryAware) { - ((MessageDeliveryAware) this.getSource()).onFailure(message, e); + } + + public final void stop() { + synchronized (this.lifecycleMonitor) { + if (!this.running) { + return; + } + TaskScheduler taskScheduler = this.getTaskScheduler(); + if (taskScheduler != null) { + taskScheduler.cancel(this.poller, true); } - throw (e instanceof MessagingException) ? (MessagingException) e - : new MessageDeliveryException(message, "channel adapter failed to send message to target", e); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index 7361a06bd8..930b69da21 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -16,8 +16,6 @@ package org.springframework.integration.endpoint; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; /** * Base interface for message endpoints. @@ -28,8 +26,4 @@ public interface MessageEndpoint { String getName(); - MessageSource getSource(); - - boolean send(Message message); - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/OutboundChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/OutboundChannelAdapter.java index b294c197d6..3f3f437148 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/OutboundChannelAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/OutboundChannelAdapter.java @@ -18,6 +18,7 @@ package org.springframework.integration.endpoint; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageTarget; import org.springframework.util.Assert; @@ -27,7 +28,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class OutboundChannelAdapter extends AbstractEndpoint { +public class OutboundChannelAdapter extends AbstractMessageConsumingEndpoint { private final MessageTarget target; @@ -39,8 +40,10 @@ public class OutboundChannelAdapter extends AbstractEndpoint { @Override - protected boolean sendInternal(Message message) { - return this.target.send(message); + protected void processMessage(Message message) { + if (!this.target.send(message)) { + throw new MessageDeliveryException(message, "failed to deliver Message to target"); + } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java index 80a92e5d49..580ce5b9bc 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ServiceActivatorEndpoint.java @@ -50,6 +50,7 @@ public class ServiceActivatorEndpoint extends AbstractInOutEndpoint { @Override protected void initialize() throws Exception { + super.initialize(); if (this.invoker instanceof InitializingBean) { ((InitializingBean) this.invoker).afterPropertiesSet(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePoller.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePoller.java index e42cc73c44..127ab13670 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePoller.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePoller.java @@ -16,30 +16,34 @@ package org.springframework.integration.endpoint; -import org.springframework.integration.dispatcher.SimpleDispatcher; +import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.message.BlockingSource; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageDeliveryAware; +import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.message.MessagingException; import org.springframework.integration.message.PollableSource; -import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.Schedule; import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class SourcePoller extends AbstractPoller implements SubscribableSource { +public class SourcePoller extends AbstractPoller { private final PollableSource source; - private final SimpleDispatcher dispatcher = new SimpleDispatcher(); + private final MessageChannel channel; private volatile long receiveTimeout = 1000; - public SourcePoller(PollableSource source, Schedule schedule) { + public SourcePoller(PollableSource source, MessageChannel channel, Schedule schedule) { super(schedule); Assert.notNull(source, "source must not be null"); + Assert.notNull(channel, "channel must not be null"); this.source = source; + this.channel = channel; } @@ -54,14 +58,6 @@ public class SourcePoller extends AbstractPoller implements SubscribableSource { this.receiveTimeout = receiveTimeout; } - public boolean subscribe(MessageEndpoint endpoint) { - return this.dispatcher.subscribe(endpoint); - } - - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.dispatcher.unsubscribe(endpoint); - } - @Override protected boolean doPoll() { Message message = (this.receiveTimeout >= 0 && this.source instanceof BlockingSource) @@ -70,7 +66,20 @@ public class SourcePoller extends AbstractPoller implements SubscribableSource { if (message == null) { return false; } - return this.dispatcher.dispatch(message); + try { + boolean sent = this.channel.send(message); + if (sent && this.source instanceof MessageDeliveryAware) { + ((MessageDeliveryAware) this.source).onSend(message); + } + return sent; + } + catch (Exception e) { + if (this.source instanceof MessageDeliveryAware) { + ((MessageDeliveryAware) this.source).onFailure(message, e); + } + throw (e instanceof MessagingException) ? (MessagingException) e + : new MessageDeliveryException(message, "source poller failed to send message to channel", e); + } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java index 6f772eb486..0d459aadad 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/SimpleMessagingGateway.java @@ -205,7 +205,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M } ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(this.replyMapCapacity); correlator.setBeanName("internal.correlator." + this); - correlator.setSource(this.replyChannel); + correlator.setInputChannel(this.replyChannel); correlator.afterPropertiesSet(); this.endpointRegistry.registerEndpoint(correlator); this.replyMessageCorrelator = correlator; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java index 1cc02ea853..a840ae9b59 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/message/SubscribableSource.java @@ -16,8 +16,6 @@ package org.springframework.integration.message; -import org.springframework.integration.endpoint.MessageEndpoint; - /** * Interface for any source of messages that accepts subscribers. * @@ -26,13 +24,13 @@ import org.springframework.integration.endpoint.MessageEndpoint; public interface SubscribableSource extends MessageSource { /** - * Register a {@link MessageEndpoint} as a subscriber to this source. + * Register a {@link MessageConsumer} as a subscriber to this source. */ - boolean subscribe(MessageEndpoint endpoint); + boolean subscribe(MessageConsumer consumer); /** - * Remove a {@link MessageEndpoint} from the subscribers of this source. + * Remove a {@link MessageConsumer} from the subscribers of this source. */ - boolean unsubscribe(MessageEndpoint endpoint); + boolean unsubscribe(MessageConsumer consumer); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java index 50c6357d87..665c9a6434 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/router/RouterEndpoint.java @@ -21,7 +21,7 @@ import java.util.Collection; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageExchangeTemplate; @@ -30,7 +30,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class RouterEndpoint extends AbstractEndpoint { +public class RouterEndpoint extends AbstractMessageConsumingEndpoint { private final ChannelResolver channelResolver; @@ -78,7 +78,7 @@ public class RouterEndpoint extends AbstractEndpoint { } @Override - protected boolean sendInternal(Message message) { + protected void processMessage(Message message) { boolean sent = false; Collection results = this.channelResolver.resolveChannels(message); if (results != null) { @@ -99,7 +99,6 @@ public class RouterEndpoint extends AbstractEndpoint { "no target resolved by router and no default output channel defined"); } } - return sent; } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index dd1aa428df..b559c294ca 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -34,7 +34,6 @@ import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.endpoint.AbstractInOutEndpoint; import org.springframework.integration.endpoint.InboundChannelAdapter; -import org.springframework.integration.endpoint.SourcePoller; import org.springframework.integration.message.ErrorMessage; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; @@ -66,7 +65,7 @@ public class DefaultMessageBusTests { } }; endpoint.setBeanName("testEndpoint"); - endpoint.setSource(sourceChannel); + endpoint.setInputChannel(sourceChannel); bus.registerEndpoint(endpoint); bus.start(); Message result = targetChannel.receive(3000); @@ -126,10 +125,10 @@ public class DefaultMessageBusTests { bus.registerChannel(outputChannel1); bus.registerChannel(outputChannel2); endpoint1.setBeanName("testEndpoint1"); - endpoint1.setSource(inputChannel); + endpoint1.setInputChannel(inputChannel); endpoint1.setOutputChannel(outputChannel1); endpoint2.setBeanName("testEndpoint2"); - endpoint2.setSource(inputChannel); + endpoint2.setInputChannel(inputChannel); endpoint2.setOutputChannel(outputChannel2); bus.registerEndpoint(endpoint1); bus.registerEndpoint(endpoint2); @@ -169,10 +168,10 @@ public class DefaultMessageBusTests { bus.registerChannel(outputChannel1); bus.registerChannel(outputChannel2); endpoint1.setBeanName("testEndpoint1"); - endpoint1.setSource(inputChannel); + endpoint1.setInputChannel(inputChannel); endpoint1.setOutputChannel(outputChannel1); endpoint2.setBeanName("testEndpoint2"); - endpoint2.setSource(inputChannel); + endpoint2.setInputChannel(inputChannel); endpoint2.setOutputChannel(outputChannel2); bus.registerEndpoint(endpoint1); bus.registerEndpoint(endpoint2); @@ -191,18 +190,21 @@ public class DefaultMessageBusTests { public void testErrorChannelWithFailedDispatch() throws InterruptedException { MessageBus bus = new DefaultMessageBus(); QueueChannel errorChannel = new QueueChannel(); + QueueChannel outputChannel = new QueueChannel(); errorChannel.setBeanName("errorChannel"); bus.registerChannel(errorChannel); CountDownLatch latch = new CountDownLatch(1); InboundChannelAdapter channelAdapter = new InboundChannelAdapter(); - SourcePoller poller = new SourcePoller(new FailingSource(latch), new PollingSchedule(1000)); - channelAdapter.setSource(poller); + channelAdapter.setSource(new FailingSource(latch)); + channelAdapter.setSchedule(new PollingSchedule(1000)); + channelAdapter.setChannel(outputChannel); channelAdapter.setBeanName("testChannel"); bus.registerEndpoint(channelAdapter); bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); Message message = errorChannel.receive(5000); bus.stop(); + assertNull(outputChannel.receive(0)); assertNotNull("message should not be null", message); assertTrue(message instanceof ErrorMessage); Throwable exception = ((ErrorMessage) message).getPayload(); @@ -237,7 +239,7 @@ public class DefaultMessageBusTests { } }; endpoint.setBeanName("testEndpoint"); - endpoint.setSource(errorChannel); + endpoint.setInputChannel(errorChannel); bus.registerEndpoint(endpoint); bus.start(); errorChannel.send(new ErrorMessage(new RuntimeException("test-exception"))); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index 715f69515e..8a716c2e3b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -61,7 +61,7 @@ public class DirectChannelSubscriptionTests { public void testSendAndReceiveForRegisteredEndpoint() { MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle"); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker); - endpoint.setSource(sourceChannel); + endpoint.setInputChannel(sourceChannel); endpoint.setOutputChannel(targetChannel); endpoint.setBeanName("testEndpoint"); bus.registerEndpoint(endpoint); @@ -95,7 +95,7 @@ public class DirectChannelSubscriptionTests { throw new RuntimeException("intentional test failure"); } }; - endpoint.setSource(sourceChannel); + endpoint.setInputChannel(sourceChannel); endpoint.setOutputChannel(targetChannel); endpoint.setBeanName("testEndpoint"); bus.registerEndpoint(endpoint); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml index d859342660..6656eeef47 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml @@ -12,7 +12,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java index a0986cd4bb..f50cb53875 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/DirectChannelTests.java @@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.StringMessage; /** @@ -62,7 +60,7 @@ public class DirectChannelTests { } - private static class ThreadNameExtractingTestTarget implements MessageEndpoint { + private static class ThreadNameExtractingTestTarget implements MessageConsumer { private String threadName; @@ -77,21 +75,11 @@ public class DirectChannelTests { this.latch = latch; } - public boolean send(Message message) { + public void onMessage(Message message) { this.threadName = Thread.currentThread().getName(); if (this.latch != null) { this.latch.countDown(); } - return true; - } - - // TODO: remove once this is a consumer instead of endpoint - public String getName() { - return null; - } - - public MessageSource getSource() { - return null; } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java index 78dff2df94..0240dc075b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/config/TestSubscribableSource.java @@ -19,8 +19,8 @@ package org.springframework.integration.channel.config; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.SubscribableSource; /** @@ -28,20 +28,20 @@ import org.springframework.integration.message.SubscribableSource; */ public class TestSubscribableSource implements SubscribableSource { - private final List endpoints = new CopyOnWriteArrayList(); + private final List subscibers = new CopyOnWriteArrayList(); - public boolean subscribe(MessageEndpoint endpoint) { - return this.endpoints.add(endpoint); + public boolean subscribe(MessageConsumer subsciber) { + return this.subscibers.add(subsciber); } - public boolean unsubscribe(MessageEndpoint endpoint) { - return this.endpoints.remove(endpoint); + public boolean unsubscribe(MessageConsumer subsciber) { + return this.subscibers.remove(subsciber); } public void publishMessage(Message message) { - for (MessageEndpoint endpoint : this.endpoints) { - endpoint.send(message); + for (MessageConsumer subsciber : this.subscibers) { + subsciber.onMessage(message); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java index 0f5e8d69d7..a6a4eebe2f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/AggregatorParserTests.java @@ -61,7 +61,7 @@ public class AggregatorParserTests { outboundMessages.add(createMessage("789", "id1", 3, 3, null)); outboundMessages.add(createMessage("456", "id1", 3, 2, null)); for (Message message : outboundMessages) { - endpoint.send(message); + endpoint.onMessage(message); } Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean .getAggregatedMessages().size()); @@ -111,7 +111,7 @@ public class AggregatorParserTests { outboundMessages.add(createMessage(2l, "id1", 3, 3, null)); outboundMessages.add(createMessage(3l, "id1", 3, 2, null)); for (Message message : outboundMessages) { - addingAggregator.send(message); + addingAggregator.onMessage(message); } PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message response = outputChannel.receive(); @@ -140,13 +140,13 @@ public class AggregatorParserTests { MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker"); Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy); Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness")); - aggregatorWithPojoCompletionStrategy.send(createMessage(1l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.send(createMessage(2l, "id1", 0 , 0, null)); - aggregatorWithPojoCompletionStrategy.send(createMessage(3l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.onMessage(createMessage(1l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.onMessage(createMessage(2l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.onMessage(createMessage(3l, "id1", 0 , 0, null)); PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel"); Message reply = outputChannel.receive(0); Assert.assertNull(reply); - aggregatorWithPojoCompletionStrategy.send(createMessage(5l, "id1", 0 , 0, null)); + aggregatorWithPojoCompletionStrategy.onMessage(createMessage(5l, "id1", 0 , 0, null)); reply = outputChannel.receive(0); Assert.assertNotNull(reply); Assert.assertEquals(11l, reply.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java index f42173d1fe..5de34b920c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointParserTests.java @@ -19,7 +19,6 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import java.util.concurrent.TimeUnit; @@ -28,10 +27,10 @@ import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageBuilder; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.message.StringMessage; @@ -57,11 +56,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorAccepts() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint"); + MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); QueueChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); - assertTrue(endpoint.send(message)); + endpoint.onMessage(message); Message reply = replyChannel.receive(500); assertNotNull(reply); assertEquals("foo", reply.getPayload()); @@ -71,11 +70,11 @@ public class EndpointParserTests { public void testEndpointWithSelectorRejects() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "endpointWithSelector.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint"); + MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint"); MessageChannel replyChannel = new QueueChannel(); Message message = MessageBuilder.fromPayload(123) .setReturnAddress(replyChannel).build(); - endpoint.send(message); + endpoint.onMessage(message); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index cdf056d63f..a8f29171b5 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -44,11 +44,11 @@ import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.ChannelPoller; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.StringMessage; -import org.springframework.integration.message.SubscribableSource; import org.springframework.integration.scheduling.PollingSchedule; import org.springframework.integration.scheduling.Schedule; import org.springframework.integration.util.MethodInvoker; @@ -295,10 +295,10 @@ public class MessagingAnnotationPostProcessorTests { AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation(); postProcessor.postProcessAfterInitialization(endpoint, "testBean"); ServiceActivatorEndpoint processedEndpoint = (ServiceActivatorEndpoint) messageBus.lookupEndpoint("testBean.serviceActivator"); + processedEndpoint.afterPropertiesSet(); DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint); - MessageSource source = (MessageSource) accessor.getPropertyValue("source"); - assertTrue(source instanceof SubscribableSource); - Schedule schedule = (Schedule) new DirectFieldAccessor(source).getPropertyValue("schedule"); + ChannelPoller poller = (ChannelPoller) accessor.getPropertyValue("poller"); + Schedule schedule = (Schedule) new DirectFieldAccessor(poller).getPropertyValue("schedule"); assertEquals(PollingSchedule.class, schedule.getClass()); PollingSchedule pollingSchedule = (PollingSchedule) schedule; assertEquals(1234, pollingSchedule.getPeriod()); @@ -318,17 +318,10 @@ public class MessagingAnnotationPostProcessorTests { DirectChannel testChannel = (DirectChannel) messageBus.lookupChannel("testChannel"); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference> receivedMessage = new AtomicReference>(); - testChannel.subscribe(new org.springframework.integration.endpoint.MessageEndpoint() { - public boolean send(Message message) { + testChannel.subscribe(new MessageConsumer() { + public void onMessage(Message message) { receivedMessage.set(message); latch.countDown(); - return false; - } - public String getName() { - return null; - } - public MessageSource getSource() { - return null; } }); latch.await(3, TimeUnit.SECONDS); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index 840948bccc..ae0f073fa1 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -17,7 +17,6 @@ package org.springframework.integration.dispatcher; import static org.easymock.EasyMock.createMock; -import static org.easymock.EasyMock.expect; import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.getCurrentArguments; import static org.easymock.EasyMock.isA; @@ -35,9 +34,8 @@ import org.junit.Before; import org.junit.Test; import org.springframework.core.task.TaskExecutor; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageSource; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.StringMessage; /** @@ -52,11 +50,11 @@ public class BroadcastingDispatcherTests { private Message messageMock = createMock(Message.class); - private MessageEndpoint targetMock1 = createMock(MessageEndpoint.class); + private MessageConsumer targetMock1 = createMock(MessageConsumer.class); - private MessageEndpoint targetMock2 = createMock(MessageEndpoint.class); + private MessageConsumer targetMock2 = createMock(MessageConsumer.class); - private MessageEndpoint targetMock3 = createMock(MessageEndpoint.class); + private MessageConsumer targetMock3 = createMock(MessageConsumer.class); private Object[] globalMocks = new Object[] { messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 }; @@ -75,7 +73,8 @@ public class BroadcastingDispatcherTests { public void singleTargetWithoutTaskExecutor() throws Exception { dispatcher.setTaskExecutor(null); dispatcher.subscribe(targetMock1); - expect(targetMock1.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -84,7 +83,8 @@ public class BroadcastingDispatcherTests { @Test public void singleTargetWithTaskExecutor() throws Exception { dispatcher.subscribe(targetMock1); - expect(targetMock1.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -96,9 +96,12 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock1); dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); - expect(targetMock1.send(messageMock)).andReturn(true); - expect(targetMock2.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); + targetMock2.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -109,9 +112,12 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock1); dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); - expect(targetMock1.send(messageMock)).andReturn(true); - expect(targetMock2.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); + targetMock2.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -124,8 +130,10 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); partialFailingExecutorMock(false, true, true); - expect(targetMock2.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true); + targetMock2.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -138,8 +146,10 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); partialFailingExecutorMock(true, false, true); - expect(targetMock1.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -152,8 +162,10 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); partialFailingExecutorMock(true, true, false); - expect(targetMock1.send(messageMock)).andReturn(true); - expect(targetMock2.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); + targetMock2.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -176,7 +188,8 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock1); dispatcher.subscribe(targetMock1); dispatcher.subscribe(targetMock1); - expect(targetMock1.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -188,8 +201,10 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); dispatcher.unsubscribe(targetMock2); - expect(targetMock1.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true); + targetMock1.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall(); replay(globalMocks); dispatcher.dispatch(messageMock); verify(globalMocks); @@ -200,9 +215,12 @@ public class BroadcastingDispatcherTests { dispatcher.subscribe(targetMock1); dispatcher.subscribe(targetMock2); dispatcher.subscribe(targetMock3); - expect(targetMock1.send(messageMock)).andReturn(true).times(2); - expect(targetMock2.send(messageMock)).andReturn(true); - expect(targetMock3.send(messageMock)).andReturn(true).times(2); + targetMock1.onMessage(messageMock); + expectLastCall().times(2); + targetMock2.onMessage(messageMock); + expectLastCall(); + targetMock3.onMessage(messageMock); + expectLastCall().times(2); replay(globalMocks); dispatcher.dispatch(messageMock); dispatcher.unsubscribe(targetMock2); @@ -214,8 +232,8 @@ public class BroadcastingDispatcherTests { public void applySequenceDisabledByDefault() { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); final List> messages = Collections.synchronizedList(new ArrayList>()); - MessageEndpoint target1 = new MessageStoringTestEndpoint(messages); - MessageEndpoint target2 = new MessageStoringTestEndpoint(messages); + MessageConsumer target1 = new MessageStoringTestEndpoint(messages); + MessageConsumer target2 = new MessageStoringTestEndpoint(messages); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.dispatch(new StringMessage("test")); @@ -231,9 +249,9 @@ public class BroadcastingDispatcherTests { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); dispatcher.setApplySequence(true); final List> messages = Collections.synchronizedList(new ArrayList>()); - MessageEndpoint target1 = new MessageStoringTestEndpoint(messages); - MessageEndpoint target2 = new MessageStoringTestEndpoint(messages); - MessageEndpoint target3 = new MessageStoringTestEndpoint(messages); + MessageConsumer target1 = new MessageStoringTestEndpoint(messages); + MessageConsumer target2 = new MessageStoringTestEndpoint(messages); + MessageConsumer target3 = new MessageStoringTestEndpoint(messages); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -276,7 +294,7 @@ public class BroadcastingDispatcherTests { } - private static class MessageStoringTestEndpoint implements MessageEndpoint { + private static class MessageStoringTestEndpoint implements MessageConsumer { private final List> messageList; @@ -284,17 +302,8 @@ public class BroadcastingDispatcherTests { this.messageList = messageList; } - public boolean send(Message message) { + public void onMessage(Message message) { this.messageList.add(message); - return true; - } - - public String getName() { - return null; - } - - public MessageSource getSource() { - return null; } }; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index b4bd9ab78b..19400cdfae 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -27,13 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.springframework.integration.endpoint.AbstractInOutEndpoint; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.ServiceActivatorEndpoint; import org.springframework.integration.handler.TestHandlers; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageRejectedException; -import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.StringMessage; import org.springframework.integration.message.selector.MessageSelector; @@ -70,10 +69,15 @@ public class SimpleDispatcherTests { public void noDuplicateSubscriptions() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target = new CountingTestEndpoint(counter, false); + MessageConsumer target = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target); dispatcher.subscribe(target); - dispatcher.dispatch(new StringMessage("test")); + try { + dispatcher.dispatch(new StringMessage("test")); + } + catch (Exception e) { + // ignore + } assertEquals("target should not have duplicate subscriptions", 1, counter.get()); } @@ -81,14 +85,19 @@ public class SimpleDispatcherTests { public void unsubscribeBeforeSend() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target1 = new CountingTestEndpoint(counter, false); - MessageEndpoint target2 = new CountingTestEndpoint(counter, false); - MessageEndpoint target3 = new CountingTestEndpoint(counter, false); + MessageConsumer target1 = new CountingTestEndpoint(counter, false); + MessageConsumer target2 = new CountingTestEndpoint(counter, false); + MessageConsumer target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); dispatcher.unsubscribe(target2); - dispatcher.dispatch(new StringMessage("test")); + try { + dispatcher.dispatch(new StringMessage("test")); + } + catch (Exception e) { + // ignore + } assertEquals(2, counter.get()); } @@ -96,19 +105,34 @@ public class SimpleDispatcherTests { public void unsubscribeBetweenSends() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target1 = new CountingTestEndpoint(counter, false); - MessageEndpoint target2 = new CountingTestEndpoint(counter, false); - MessageEndpoint target3 = new CountingTestEndpoint(counter, false); + MessageConsumer target1 = new CountingTestEndpoint(counter, false); + MessageConsumer target2 = new CountingTestEndpoint(counter, false); + MessageConsumer target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); - dispatcher.dispatch(new StringMessage("test1")); + try { + dispatcher.dispatch(new StringMessage("test1")); + } + catch (Exception e) { + // ignore + } assertEquals(3, counter.get()); dispatcher.unsubscribe(target2); - dispatcher.dispatch(new StringMessage("test2")); + try { + dispatcher.dispatch(new StringMessage("test2")); + } + catch (Exception e) { + // ignore + } assertEquals(5, counter.get()); dispatcher.unsubscribe(target1); - dispatcher.dispatch(new StringMessage("test3")); + try { + dispatcher.dispatch(new StringMessage("test3")); + } + catch (Exception e) { + // ignore + } assertEquals(6, counter.get()); } @@ -116,9 +140,14 @@ public class SimpleDispatcherTests { public void unsubscribeLastTargetCausesDeliveryException() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target = new CountingTestEndpoint(counter, false); + MessageConsumer target = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target); - dispatcher.dispatch(new StringMessage("test1")); + try { + dispatcher.dispatch(new StringMessage("test1")); + } + catch (Exception e) { + // ignore + } assertEquals(1, counter.get()); dispatcher.unsubscribe(target); dispatcher.dispatch(new StringMessage("test2")); @@ -184,9 +213,9 @@ public class SimpleDispatcherTests { public void firstHandlerReturnsTrue() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target1 = new CountingTestEndpoint(counter, true); - MessageEndpoint target2 = new CountingTestEndpoint(counter, false); - MessageEndpoint target3 = new CountingTestEndpoint(counter, false); + MessageConsumer target1 = new CountingTestEndpoint(counter, true); + MessageConsumer target2 = new CountingTestEndpoint(counter, false); + MessageConsumer target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -198,9 +227,9 @@ public class SimpleDispatcherTests { public void middleHandlerReturnsTrue() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target1 = new CountingTestEndpoint(counter, false); - MessageEndpoint target2 = new CountingTestEndpoint(counter, true); - MessageEndpoint target3 = new CountingTestEndpoint(counter, false); + MessageConsumer target1 = new CountingTestEndpoint(counter, false); + MessageConsumer target2 = new CountingTestEndpoint(counter, true); + MessageConsumer target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); @@ -212,13 +241,18 @@ public class SimpleDispatcherTests { public void allHandlersReturnFalse() { SimpleDispatcher dispatcher = new SimpleDispatcher(); final AtomicInteger counter = new AtomicInteger(); - MessageEndpoint target1 = new CountingTestEndpoint(counter, false); - MessageEndpoint target2 = new CountingTestEndpoint(counter, false); - MessageEndpoint target3 = new CountingTestEndpoint(counter, false); + MessageConsumer target1 = new CountingTestEndpoint(counter, false); + MessageConsumer target2 = new CountingTestEndpoint(counter, false); + MessageConsumer target3 = new CountingTestEndpoint(counter, false); dispatcher.subscribe(target1); dispatcher.subscribe(target2); dispatcher.subscribe(target3); - assertFalse(dispatcher.dispatch(new StringMessage("test"))); + try { + assertFalse(dispatcher.dispatch(new StringMessage("test"))); + } + catch (Exception e) { + // ignore + } assertEquals("each target should have been invoked", 3, counter.get()); } @@ -246,28 +280,22 @@ public class SimpleDispatcherTests { } - private static class CountingTestEndpoint implements MessageEndpoint { + private static class CountingTestEndpoint implements MessageConsumer { private final AtomicInteger counter; - private final boolean returnValue; + private final boolean shouldAccept; - CountingTestEndpoint(AtomicInteger counter, boolean returnValue) { + CountingTestEndpoint(AtomicInteger counter, boolean shouldAccept) { this.counter = counter; - this.returnValue = returnValue; + this.shouldAccept = shouldAccept; } - public boolean send(Message message) { + public void onMessage(Message message) { this.counter.incrementAndGet(); - return this.returnValue; - } - - public String getName() { - return null; - } - - public MessageSource getSource() { - return null; + if (!this.shouldAccept) { + throw new MessageRejectedException(message, "intentional test failure"); + } } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java index 7ad8878d03..cd3455219f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ChannelPollerTests.java @@ -18,6 +18,7 @@ package org.springframework.integration.endpoint; import static org.easymock.EasyMock.createMock; import static org.easymock.EasyMock.expect; +import static org.easymock.EasyMock.expectLastCall; import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.reset; import static org.easymock.EasyMock.verify; @@ -26,9 +27,9 @@ import org.junit.Before; import org.junit.Test; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.endpoint.ChannelPoller; -import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageConsumer; +import org.springframework.integration.message.MessageRejectedException; import org.springframework.integration.scheduling.Schedule; /** @@ -40,7 +41,7 @@ public class ChannelPollerTests { private ChannelPoller poller; private Schedule scheduleMock = createMock(Schedule.class); private PollableChannel channelMock = createMock(PollableChannel.class); - private MessageEndpoint endpointMock = createMock(MessageEndpoint.class); + private MessageConsumer endpointMock = createMock(MessageConsumer.class); private Message messageMock = createMock(Message.class); private Object[] globalMocks = new Object[] { scheduleMock, channelMock, endpointMock, messageMock }; @@ -57,7 +58,8 @@ public class ChannelPollerTests { @Test public void singleMessage() { expect(channelMock.receive()).andReturn(messageMock); - expect(endpointMock.send(messageMock)).andReturn(true); + endpointMock.onMessage(messageMock); + expectLastCall(); replay(globalMocks); poller.setMaxMessagesPerPoll(1); poller.run(); @@ -67,7 +69,8 @@ public class ChannelPollerTests { @Test public void multipleMessages() { expect(channelMock.receive()).andReturn(messageMock).times(5); - expect(endpointMock.send(messageMock)).andReturn(true).times(5); + endpointMock.onMessage(messageMock); + expectLastCall().times(5); replay(globalMocks); poller.setMaxMessagesPerPoll(5); poller.run(); @@ -78,26 +81,29 @@ public class ChannelPollerTests { public void multipleMessages_underrun() { expect(channelMock.receive()).andReturn(messageMock).times(5); expect(channelMock.receive()).andReturn(null); - expect(endpointMock.send(messageMock)).andReturn(true).times(5); + endpointMock.onMessage(messageMock); + expectLastCall().times(5); replay(globalMocks); poller.setMaxMessagesPerPoll(6); poller.run(); verify(globalMocks); } - @Test - public void droppedMessage() { + @Test(expected = MessageRejectedException.class) + public void rejectedMessage() { expect(channelMock.receive()).andReturn(messageMock); - expect(endpointMock.send(messageMock)).andReturn(false); + endpointMock.onMessage(messageMock); + expectLastCall().andThrow(new MessageRejectedException(messageMock, "intentional test failure")); replay(globalMocks); poller.run(); verify(globalMocks); } - @Test + @Test(expected = MessageRejectedException.class) public void droppedMessage_onePerPoll() { expect(channelMock.receive()).andReturn(messageMock).times(1); - expect(endpointMock.send(messageMock)).andReturn(false).anyTimes(); + endpointMock.onMessage(messageMock); + expectLastCall().andThrow(new MessageRejectedException(messageMock, "intentional test failure")).anyTimes(); replay(globalMocks); poller.setMaxMessagesPerPoll(10); poller.run(); @@ -121,9 +127,11 @@ public class ChannelPollerTests { poller = new ChannelPoller(channelMock, scheduleMock); poller.subscribe(endpointMock); expect(channelMock.receive(1)).andReturn(messageMock); - expect(endpointMock.send(messageMock)).andReturn(false); + endpointMock.onMessage(messageMock); + expectLastCall(); replay(globalMocks); poller.setReceiveTimeout(1); + poller.setMaxMessagesPerPoll(1); poller.run(); verify(globalMocks); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java deleted file mode 100644 index 05049dadab..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/MessagingBridgeTests.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.endpoint; - -import static org.junit.Assert.assertEquals; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.Test; - -import org.springframework.integration.bus.DefaultMessageBus; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.PollableSource; -import org.springframework.integration.message.StringMessage; -import org.springframework.integration.scheduling.PollingSchedule; - -/** - * @author Mark Fisher - */ -public class MessagingBridgeTests { - - @Test - public void simplePassThrough() throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(1); - DefaultMessageBus bus = new DefaultMessageBus(); - MessagingBridge bridge = new MessagingBridge(new MessageTarget() { - public boolean send(Message message) { - latch.countDown(); - return true; - } - }); - bridge.setBeanName("bridge"); - PollableSource source = new PollableSource() { - public Message receive() { - return new StringMessage("test"); - } - }; - SourcePoller poller = new SourcePoller(source, new PollingSchedule(1000)); - poller.setMaxMessagesPerPoll(1); - bridge.setSource(poller); - bus.registerEndpoint(bridge); - bus.start(); - latch.await(1, TimeUnit.SECONDS); - bus.stop(); - assertEquals(0, latch.getCount()); - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java index deb42b58c3..7d6614ff85 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/ServiceActivatorEndpointTests.java @@ -54,7 +54,7 @@ public class ServiceActivatorEndpointTests { ServiceActivatorEndpoint endpoint = this.createEndpoint(); endpoint.setOutputChannel(channel); Message message = MessageBuilder.fromPayload("foo").build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = channel.receive(0); assertNotNull(reply); assertEquals("FOO", reply.getPayload()); @@ -67,7 +67,7 @@ public class ServiceActivatorEndpointTests { ServiceActivatorEndpoint endpoint = this.createEndpoint(); endpoint.setOutputChannel(channel1); Message message = MessageBuilder.fromPayload("foo").setReturnAddress(channel2).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply1 = channel1.receive(0); assertNotNull(reply1); assertEquals("FOO", reply1.getPayload()); @@ -80,7 +80,7 @@ public class ServiceActivatorEndpointTests { QueueChannel channel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = this.createEndpoint(); Message message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = channel.receive(0); assertNotNull(reply); assertEquals("FOO", reply.getPayload()); @@ -95,7 +95,7 @@ public class ServiceActivatorEndpointTests { ServiceActivatorEndpoint endpoint = this.createEndpoint(); endpoint.setChannelRegistry(channelRegistry); Message message = MessageBuilder.fromPayload("foo").setReturnAddress("testChannel").build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = channel.receive(0); assertNotNull(reply); assertEquals("FOO", reply.getPayload()); @@ -118,7 +118,7 @@ public class ServiceActivatorEndpointTests { endpoint.setChannelRegistry(channelRegistry); Message testMessage1 = MessageBuilder.fromPayload("bar") .setReturnAddress(replyChannel1).build(); - endpoint.send(testMessage1); + endpoint.onMessage(testMessage1); Message reply1 = replyChannel1.receive(50); assertNotNull(reply1); assertEquals("foobar", reply1.getPayload()); @@ -126,7 +126,7 @@ public class ServiceActivatorEndpointTests { assertNull(reply2); Message testMessage2 = MessageBuilder.fromMessage(testMessage1) .setReturnAddress("replyChannel2").build(); - endpoint.send(testMessage2); + endpoint.onMessage(testMessage2); reply1 = replyChannel1.receive(0); assertNull(reply1); reply2 = replyChannel2.receive(0); @@ -139,7 +139,7 @@ public class ServiceActivatorEndpointTests { QueueChannel channel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = this.createEndpoint(); Message message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = channel.receive(0); assertNotNull(reply); assertEquals("FOO", reply.getPayload()); @@ -149,7 +149,7 @@ public class ServiceActivatorEndpointTests { public void noReplyTarget() { ServiceActivatorEndpoint endpoint = this.createEndpoint(); Message message = MessageBuilder.fromPayload("foo").build(); - endpoint.send(message); + endpoint.onMessage(message); } @Test @@ -159,7 +159,7 @@ public class ServiceActivatorEndpointTests { new TestNullReplyBean(), "handle"); endpoint.setOutputChannel(channel); Message message = MessageBuilder.fromPayload("foo").build(); - endpoint.send(message); + endpoint.onMessage(message); assertNull(channel.receive(0)); } @@ -171,7 +171,7 @@ public class ServiceActivatorEndpointTests { endpoint.setRequiresReply(true); endpoint.setOutputChannel(channel); Message message = MessageBuilder.fromPayload("foo").build(); - endpoint.send(message); + endpoint.onMessage(message); } @Test(expected=MessageRejectedException.class) @@ -183,7 +183,7 @@ public class ServiceActivatorEndpointTests { return false; } }); - endpoint.send(new StringMessage("test")); + endpoint.onMessage(new StringMessage("test")); } @Test @@ -196,7 +196,7 @@ public class ServiceActivatorEndpointTests { return true; } }); - endpoint.send(new StringMessage("test")); + endpoint.onMessage(new StringMessage("test")); latch.await(100, TimeUnit.MILLISECONDS); assertEquals("handler should have been invoked", 0, latch.getCount()); } @@ -222,7 +222,7 @@ public class ServiceActivatorEndpointTests { endpoint.setSelector(selectorChain); boolean exceptionWasThrown = false; try { - endpoint.send(new StringMessage("test")); + endpoint.onMessage(new StringMessage("test")); } catch (MessageRejectedException e) { exceptionWasThrown = true; @@ -253,7 +253,7 @@ public class ServiceActivatorEndpointTests { endpoint.setSelector(selectorChain); boolean exceptionWasThrown = false; try { - endpoint.send(new StringMessage("test")); + endpoint.onMessage(new StringMessage("test")); } catch (MessageRejectedException e) { exceptionWasThrown = true; @@ -282,7 +282,7 @@ public class ServiceActivatorEndpointTests { } }); endpoint.setSelector(selectorChain); - assertTrue(endpoint.send(new StringMessage("test"))); + endpoint.onMessage(new StringMessage("test")); assertEquals("both selectors and handler should have been invoked", 3, counter.get()); } @@ -297,7 +297,7 @@ public class ServiceActivatorEndpointTests { }, "handle"); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = replyChannel.receive(500); assertNull(reply.getHeaders().getCorrelationId()); } @@ -313,7 +313,7 @@ public class ServiceActivatorEndpointTests { }, "handle"); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = replyChannel.receive(500); assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId()); } @@ -330,7 +330,7 @@ public class ServiceActivatorEndpointTests { }, "handle"); Message message = MessageBuilder.fromPayload("test") .setReturnAddress(replyChannel).build(); - endpoint.send(message); + endpoint.onMessage(message); Message reply = replyChannel.receive(500); Object correlationId = reply.getHeaders().getCorrelationId(); assertFalse(message.getHeaders().getId().equals(correlationId)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java index 1545d86646..d025260f18 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/CorrelationIdTests.java @@ -44,9 +44,9 @@ public class CorrelationIdTests { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setSource(inputChannel); + endpoint.setInputChannel(inputChannel); endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); + endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); assertEquals(correlationId, reply.getHeaders().getCorrelationId()); @@ -58,9 +58,9 @@ public class CorrelationIdTests { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setSource(inputChannel); + endpoint.setInputChannel(inputChannel); endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); + endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId()); @@ -73,9 +73,9 @@ public class CorrelationIdTests { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase"); - endpoint.setSource(inputChannel); + endpoint.setInputChannel(inputChannel); endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); + endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); assertEquals(message.getHeaders().getCorrelationId(), reply.getHeaders().getCorrelationId()); @@ -90,9 +90,9 @@ public class CorrelationIdTests { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setSource(inputChannel); + endpoint.setInputChannel(inputChannel); endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); + endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); assertEquals("456-XYZ", reply.getHeaders().getCorrelationId()); @@ -104,9 +104,9 @@ public class CorrelationIdTests { DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(1); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage"); - endpoint.setSource(inputChannel); + endpoint.setInputChannel(inputChannel); endpoint.setOutputChannel(outputChannel); - endpoint.afterPropertiesSet(); + endpoint.start(); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); assertEquals("456-XYZ", reply.getHeaders().getCorrelationId()); @@ -121,7 +121,7 @@ public class CorrelationIdTests { SplitterEndpoint endpoint = new SplitterEndpoint(splitter); endpoint.setOutputChannel(testChannel); splitter.afterPropertiesSet(); - endpoint.send(message); + endpoint.onMessage(message); Message reply1 = testChannel.receive(100); Message reply2 = testChannel.receive(100); assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MessageFilterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MessageFilterTests.java index ab9112e4fb..a43731576c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MessageFilterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MessageFilterTests.java @@ -64,9 +64,9 @@ public class MessageFilterTests { return true; } }); - filter.setSource(inputChannel); + filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.afterPropertiesSet(); + filter.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); Message reply = outputChannel.receive(0); @@ -83,9 +83,9 @@ public class MessageFilterTests { return false; } }); - filter.setSource(inputChannel); + filter.setInputChannel(inputChannel); filter.setOutputChannel(outputChannel); - filter.afterPropertiesSet(); + filter.start(); Message message = new StringMessage("test"); assertTrue(inputChannel.send(message)); assertNull(outputChannel.receive(0)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java index 9567acfe3c..9c3d2d1c33 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/MethodInvokingTargetTests.java @@ -91,7 +91,7 @@ public class MethodInvokingTargetTests { bus.registerChannel(channel); ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(target); endpoint.setBeanName("testEndpoint"); - endpoint.setSource(channel); + endpoint.setInputChannel(channel); bus.registerEndpoint(endpoint); bus.start(); String result = queue.poll(1000, TimeUnit.MILLISECONDS); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java index e1d56e34e1..79696ef082 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MessageExchangeTemplateTests.java @@ -53,7 +53,7 @@ public class MessageExchangeTemplateTests { MessageBus bus = new DefaultMessageBus(); bus.registerChannel(requestChannel); endpoint.setBeanName("testEndpoint"); - endpoint.setSource(requestChannel); + endpoint.setInputChannel(requestChannel); bus.registerEndpoint(endpoint); bus.start(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java index b5ad6a2147..308d0cfc25 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/MethodInvokingRouterTests.java @@ -17,10 +17,8 @@ package org.springframework.integration.router; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import java.lang.reflect.Method; import java.util.ArrayList; @@ -57,7 +55,7 @@ public class MethodInvokingRouterTests { RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setChannelRegistry(channelRegistry); Message message = new GenericMessage("bar"); - assertTrue(endpoint.send(message)); + endpoint.onMessage(message); Message replyMessage = barChannel.receive(); assertNotNull(replyMessage); assertEquals(message, replyMessage); @@ -74,7 +72,7 @@ public class MethodInvokingRouterTests { RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setChannelRegistry(channelRegistry); Message message = new GenericMessage("bar"); - assertTrue(endpoint.send(message)); + endpoint.onMessage(message); Message replyMessage = barChannel.receive(); assertNotNull(replyMessage); assertEquals(message, replyMessage); @@ -96,7 +94,7 @@ public class MethodInvokingRouterTests { endpoint.setChannelRegistry(channelRegistry); Message message = MessageBuilder.fromPayload("bar") .setHeader("targetChannel", "foo").build(); - assertTrue(endpoint.send(message)); + endpoint.onMessage(message); Message fooReply = fooChannel.receive(0); Message barReply = barChannel.receive(0); assertNotNull(fooReply); @@ -110,7 +108,7 @@ public class MethodInvokingRouterTests { Method routingMethod = testBean.getClass().getMethod("routeByHeader", String.class); MethodInvokingChannelResolver resolver = new MethodInvokingChannelResolver(testBean, routingMethod); RouterEndpoint endpoint = new RouterEndpoint(resolver); - endpoint.send(new GenericMessage("testing")); + endpoint.onMessage(new GenericMessage("testing")); } @Test @@ -142,15 +140,15 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -183,15 +181,15 @@ public class MethodInvokingRouterTests { channelRegistry.registerChannel(fooChannel); channelRegistry.registerChannel(barChannel); endpoint.setChannelRegistry(channelRegistry); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -224,15 +222,15 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1 = fooChannel.receive(0); assertNotNull(result1); assertEquals("foo", result1.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2 = barChannel.receive(0); assertNotNull(result2); assertEquals("bar", result2.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -265,21 +263,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -312,21 +310,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); Message result1b = barChannel.receive(0); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); Message result2b = barChannel.receive(0); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -359,21 +357,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); Message result1b = barChannel.receive(0); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); Message result2b = barChannel.receive(0); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -406,21 +404,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -453,21 +451,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } @Test @@ -500,21 +498,21 @@ public class MethodInvokingRouterTests { Message fooMessage = new StringMessage("foo"); Message barMessage = new StringMessage("bar"); Message badMessage = new StringMessage("bad"); - assertTrue(endpoint.send(fooMessage)); + endpoint.onMessage(fooMessage); Message result1a = fooChannel.receive(0); Message result1b = barChannel.receive(0); assertNotNull(result1a); assertEquals("foo", result1a.getPayload()); assertNotNull(result1b); assertEquals("foo", result1b.getPayload()); - assertTrue(endpoint.send(barMessage)); + endpoint.onMessage(barMessage); Message result2a = fooChannel.receive(0); Message result2b = barChannel.receive(0); assertNotNull(result2a); assertEquals("bar", result2a.getPayload()); assertNotNull(result2b); assertEquals("bar", result2b.getPayload()); - assertFalse(endpoint.send(badMessage)); + endpoint.onMessage(badMessage); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java index 9b7aaf0c23..efef14f7ff 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/MultiChannelRouterTests.java @@ -51,7 +51,7 @@ public class MultiChannelRouterTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelResolver); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -77,7 +77,7 @@ public class MultiChannelRouterTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -97,7 +97,7 @@ public class MultiChannelRouterTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } @Test(expected = MessagingException.class) @@ -109,7 +109,7 @@ public class MultiChannelRouterTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java index 9c4ede3db5..98876df2bc 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/PayloadTypeRouterTests.java @@ -71,8 +71,8 @@ public class PayloadTypeRouterTests { endpoint.setChannelRegistry(channelRegistry); Message message1 = new StringMessage("test"); Message message2 = new GenericMessage(123); - endpoint.send(message1); - endpoint.send(message2); + endpoint.onMessage(message1); + endpoint.onMessage(message2); Message reply1 = stringChannel.receive(0); Message reply2 = integerChannel.receive(0); assertEquals("test", reply1.getPayload()); @@ -96,8 +96,8 @@ public class PayloadTypeRouterTests { endpoint.setDefaultOutputChannel(defaultChannel); Message message1 = new StringMessage("test"); Message message2 = new GenericMessage(123); - endpoint.send(message1); - endpoint.send(message2); + endpoint.onMessage(message1); + endpoint.onMessage(message2); Message result1 = stringChannel.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java index 738119a2ba..c67f1d5113 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RecipientListRouterTests.java @@ -69,7 +69,7 @@ public class RecipientListRouterTests { resolver.afterPropertiesSet(); RouterEndpoint endpoint = new RouterEndpoint(resolver); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -93,7 +93,7 @@ public class RecipientListRouterTests { RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); @@ -117,7 +117,7 @@ public class RecipientListRouterTests { RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result1 = channel1.receive(25); assertNotNull(result1); assertEquals("test", result1.getPayload()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java index 64cc4573f8..56fb7161ea 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RootCauseErrorMessageRouterTests.java @@ -65,7 +65,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); @@ -87,7 +87,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(runtimeExceptionChannel.receive(1000)); assertNull(illegalArgumentChannel.receive(0)); assertNull(defaultChannel.receive(0)); @@ -108,7 +108,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(messageHandlingExceptionChannel.receive(1000)); assertNull(runtimeExceptionChannel.receive(0)); assertNull(illegalArgumentChannel.receive(0)); @@ -125,7 +125,7 @@ public class RootCauseErrorMessageRouterTests { RootCauseErrorMessageChannelResolver resolver = new RootCauseErrorMessageChannelResolver(); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(defaultChannel.receive(1000)); assertNull(runtimeExceptionChannel.receive(0)); assertNull(illegalArgumentChannel.receive(0)); @@ -146,7 +146,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setResolutionRequired(true); - endpoint.send(message); + endpoint.onMessage(message); } @Test @@ -165,7 +165,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); @@ -187,7 +187,7 @@ public class RootCauseErrorMessageRouterTests { resolver.setChannelMappings(channelMappings); RouterEndpoint endpoint = new RouterEndpoint(resolver); endpoint.setDefaultOutputChannel(defaultChannel); - endpoint.send(message); + endpoint.onMessage(message); assertNotNull(illegalArgumentChannel.receive(1000)); assertNull(defaultChannel.receive(0)); assertNull(runtimeExceptionChannel.receive(0)); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java index 9f81e8bccd..ea4f1f261c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/RouterEndpointTests.java @@ -16,8 +16,6 @@ package org.springframework.integration.router; -import static org.junit.Assert.assertFalse; - import java.util.Collections; import java.util.List; @@ -45,7 +43,7 @@ public class RouterEndpointTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelResolver); Message message = new StringMessage("test"); - assertFalse(endpoint.send(message)); + endpoint.onMessage(message); } @Test(expected = MessageDeliveryException.class) @@ -58,7 +56,7 @@ public class RouterEndpointTests { RouterEndpoint endpoint = new RouterEndpoint(channelResolver); endpoint.setResolutionRequired(true); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } @Test @@ -70,7 +68,7 @@ public class RouterEndpointTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelResolver); Message message = new StringMessage("test"); - assertFalse(endpoint.send(message)); + endpoint.onMessage(message); } @Test(expected = MessageDeliveryException.class) @@ -83,7 +81,7 @@ public class RouterEndpointTests { RouterEndpoint endpoint = new RouterEndpoint(channelResolver); endpoint.setResolutionRequired(true); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } @Test @@ -97,7 +95,7 @@ public class RouterEndpointTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - assertFalse(endpoint.send(message)); + endpoint.onMessage(message); } @Test(expected = MessageDeliveryException.class) @@ -112,7 +110,7 @@ public class RouterEndpointTests { endpoint.setChannelRegistry(channelRegistry); endpoint.setResolutionRequired(true); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } @@ -127,7 +125,7 @@ public class RouterEndpointTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - assertFalse(endpoint.send(message)); + endpoint.onMessage(message); } @Test(expected = MessageDeliveryException.class) @@ -142,7 +140,7 @@ public class RouterEndpointTests { endpoint.setChannelRegistry(channelRegistry); endpoint.setResolutionRequired(true); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } @Test(expected = MessagingException.class) @@ -153,7 +151,7 @@ public class RouterEndpointTests { } }; RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); - endpoint.send(new StringMessage("this should fail")); + endpoint.onMessage(new StringMessage("this should fail")); } @Test(expected = MessagingException.class) @@ -164,7 +162,7 @@ public class RouterEndpointTests { } }; RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); - endpoint.send(new StringMessage("this should fail")); + endpoint.onMessage(new StringMessage("this should fail")); } @Test(expected = IllegalArgumentException.class) diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java index 71ba59606d..f4ba49071a 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/router/SingleChannelRouterTests.java @@ -17,7 +17,6 @@ package org.springframework.integration.router; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import org.junit.Test; @@ -45,7 +44,7 @@ public class SingleChannelRouterTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelResolver); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result = channel.receive(25); assertNotNull(result); assertEquals("test", result.getPayload()); @@ -65,14 +64,14 @@ public class SingleChannelRouterTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); Message result = channel.receive(25); assertNotNull(result); assertEquals("test", result.getPayload()); } @Test - public void nullChannelResult() { + public void nullChannelResultIgnored() { AbstractSingleChannelResolver channelResolver = new AbstractSingleChannelResolver() { public MessageChannel resolveChannel(Message message) { return null; @@ -80,7 +79,7 @@ public class SingleChannelRouterTests { }; RouterEndpoint endpoint = new RouterEndpoint(channelResolver); Message message = new StringMessage("test"); - assertFalse(endpoint.send(message)); + endpoint.onMessage(message); } @Test(expected = MessagingException.class) @@ -94,7 +93,7 @@ public class SingleChannelRouterTests { RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver); endpoint.setChannelRegistry(channelRegistry); Message message = new StringMessage("test"); - endpoint.send(message); + endpoint.onMessage(message); } }