From ca7dc3922e399f07da55ecf470dbe37ccd69c556 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Mon, 14 Jul 2008 18:29:07 +0000 Subject: [PATCH] Endpoints now expose a setter for a Poller strategy and no longer implement Lifecycle. --- .../adapter/stream/ByteStreamSourceTests.java | 6 +- .../stream/CharacterStreamSourceTests.java | 2 +- .../integration/bus/DefaultMessageBus.java | 17 +- .../SourceAnnotationPostProcessor.java | 2 +- .../endpoint/AbstractEndpoint.java | 195 ++++++++++-------- .../integration/endpoint/HandlerEndpoint.java | 35 +--- .../integration/endpoint/MessageEndpoint.java | 5 + .../integration/endpoint/SourceEndpoint.java | 49 +---- .../integration/endpoint/TargetEndpoint.java | 85 +------- .../gateway/RequestReplyTemplate.java | 6 +- .../bus/DefaultMessageBusTests.java | 2 +- .../integration/bus/messageBusTests.xml | 2 +- .../config/EndpointInterceptorTests.java | 12 +- ...MessagingAnnotationPostProcessorTests.java | 9 +- .../BroadcastingDispatcherTests.java | 4 +- .../dispatcher/SimpleDispatcherTests.java | 4 +- .../endpoint/HandlerEndpointTests.java | 46 ----- .../endpoint/SourceEndpointTests.java | 13 +- .../integration/handler/adapterTests.xml | 4 +- 19 files changed, 169 insertions(+), 329 deletions(-) diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java index 9785eade90..69be88d7fb 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/ByteStreamSourceTests.java @@ -43,7 +43,7 @@ public class ByteStreamSourceTests { MessageChannel channel = new QueueChannel(); ByteStreamSource source = new ByteStreamSource(stream); SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); + endpoint.setTarget(channel); endpoint.afterPropertiesSet(); endpoint.send(new GenericMessage(new EndpointPoller())); Message message1 = channel.receive(500); @@ -69,7 +69,7 @@ public class ByteStreamSourceTests { PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); + endpoint.setTarget(channel); endpoint.afterPropertiesSet(); endpoint.send(new GenericMessage(new EndpointPoller())); Message message1 = channel.receive(0); @@ -92,7 +92,7 @@ public class ByteStreamSourceTests { PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); + endpoint.setTarget(channel); endpoint.afterPropertiesSet(); endpoint.send(new GenericMessage(new EndpointPoller())); Message message1 = channel.receive(0); diff --git a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java index 0ebd999229..6a370adbed 100644 --- a/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java +++ b/org.springframework.integration.adapter/src/test/java/org/springframework/integration/adapter/stream/CharacterStreamSourceTests.java @@ -44,7 +44,7 @@ public class CharacterStreamSourceTests { PollingSchedule schedule = new PollingSchedule(1000); schedule.setInitialDelay(10000); SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); + endpoint.setTarget(channel); endpoint.afterPropertiesSet(); endpoint.send(new GenericMessage(new EndpointPoller())); Message message1 = channel.receive(0); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java index 32499987b1..29f520e1c8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java @@ -263,7 +263,7 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A private void configureEndpoint(AbstractEndpoint endpoint, String name, Object input, Schedule schedule) { endpoint.setName(name); if (input instanceof MessageChannel) { - endpoint.setInputChannel((MessageChannel) input); + endpoint.setSource((MessageChannel) input); } else if (input instanceof String) { endpoint.setInputChannelName((String) input); @@ -322,15 +322,12 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A if (endpoint.getOutputChannel() == null) { this.lookupOrCreateChannel(endpoint.getOutputChannelName()); } - try { - endpoint.afterPropertiesSet(); - } - catch (Exception e) { - throw new ConfigurationException("failed to initialize endpoint", e); - } MessageChannel channel = endpoint.getInputChannel(); if (channel == null) { channel = this.lookupOrCreateChannel(endpoint.getInputChannelName()); + if (channel != null) { + endpoint.setSource(channel); + } } if (channel != null && channel instanceof Subscribable) { ((Subscribable) channel).subscribe(endpoint); @@ -343,6 +340,12 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A Schedule schedule = endpoint.getSchedule(); EndpointTrigger trigger = new EndpointTrigger(schedule != null ? schedule : this.defaultPollerSchedule); trigger.addTarget(endpoint); + try { + endpoint.afterPropertiesSet(); + } + catch (Exception e) { + throw new ConfigurationException("failed to initialize endpoint '" + endpoint + "'", e); + } if (this.endpointTriggers.add(trigger)) { this.taskScheduler.schedule(trigger); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java index 5f80e1082d..fb57a4a9ed 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/SourceAnnotationPostProcessor.java @@ -74,7 +74,7 @@ public class SourceAnnotationPostProcessor extends AbstractAnnotationMethodPostP if (!StringUtils.hasText(outputChannelName)) { MessageChannel outputChannel = new DirectChannel(); this.getMessageBus().registerChannel(beanName + ".output", outputChannel); - endpoint.setOutputChannel(outputChannel); + endpoint.setTarget(outputChannel); } else { endpoint.setOutputChannelName(outputChannelName); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index e4c02aa189..2e71e25cdf 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -23,13 +23,17 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanNameAware; -import org.springframework.context.Lifecycle; import org.springframework.integration.channel.ChannelRegistry; +import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageRejectedException; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; +import org.springframework.integration.message.MessagingException; +import org.springframework.integration.message.Poller; +import org.springframework.integration.message.TargetInvoker; +import org.springframework.integration.message.selector.MessageSelector; import org.springframework.integration.scheduling.Schedule; /** @@ -37,7 +41,7 @@ import org.springframework.integration.scheduling.Schedule; * * @author Mark Fisher */ -public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware, Lifecycle { +public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware { protected final Log logger = LogFactory.getLog(this.getClass()); @@ -45,26 +49,26 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware private volatile String inputChannelName; - private MessageChannel inputChannel; - private volatile String outputChannelName; - private MessageChannel outputChannel; + private volatile MessageSource source; - private final List interceptors = new ArrayList(); + private volatile MessageTarget target; + + private volatile Poller poller; private volatile Schedule schedule; - private volatile EndpointTrigger trigger; + private final TargetInvoker targetInvoker = new TargetInvoker(); + + private volatile long sendTimeout; + + private volatile MessageSelector selector; + + private final List interceptors = new ArrayList(); private volatile ChannelRegistry channelRegistry; - private volatile boolean autoStartup = true; - - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - public String getName() { return this.name; @@ -86,12 +90,8 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware return this.schedule; } - public void setTrigger(EndpointTrigger trigger) { - this.trigger = trigger; - } - - public EndpointTrigger getTrigger() { - return this.trigger; + public void setPoller(Poller poller) { + this.poller = poller; } public void setInputChannelName(String inputChannelName) { @@ -102,17 +102,27 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware return this.inputChannelName; } - public void setInputChannel(MessageChannel channel) { - this.inputChannel = channel; - this.inputChannelName = channel.getName(); + public void setSource(MessageSource source) { + if (source instanceof MessageChannel) { + this.inputChannelName = ((MessageChannel) source).getName(); + } + this.source = source; } public MessageChannel getInputChannel() { - if (this.inputChannel == null && - (this.inputChannelName != null && this.channelRegistry != null)) { - this.inputChannel = this.channelRegistry.lookupChannel(this.inputChannelName); + if (this.source != null) { + if (this.source instanceof MessageChannel) { + return (MessageChannel) this.source; + } } - return this.inputChannel; + else if (this.inputChannelName != null && this.channelRegistry != null) { + MessageChannel inputChannel = this.channelRegistry.lookupChannel(this.inputChannelName); + if (inputChannel != null) { + this.source = inputChannel; + } + return inputChannel; + } + return null; } /** @@ -127,17 +137,31 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware return this.outputChannelName; } - public void setOutputChannel(MessageChannel outputChannel) { - this.outputChannel = outputChannel; - this.outputChannelName = outputChannel.getName(); + public void setTarget(MessageTarget target) { + if (target instanceof MessageChannel) { + this.outputChannelName = ((MessageChannel) target).getName(); + } + this.target = target; + } + + public void setSendTimeout(long sendTimeout) { + this.sendTimeout = sendTimeout; } public MessageChannel getOutputChannel() { - if (this.outputChannel == null && - (this.outputChannelName != null && this.channelRegistry != null)) { - this.outputChannel = this.channelRegistry.lookupChannel(this.outputChannelName); + if (this.target != null) { + if (this.target instanceof MessageChannel) { + return (MessageChannel) this.target; + } } - return this.outputChannel; + else if (this.outputChannelName != null && this.channelRegistry != null) { + MessageChannel outputChannel = this.channelRegistry.lookupChannel(this.outputChannelName); + if (outputChannel != null) { + this.target = outputChannel; + } + return outputChannel; + } + return null; } /** @@ -151,8 +175,8 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware return this.channelRegistry; } - public void setAutoStartup(boolean autoStartup) { - this.autoStartup = autoStartup; + public void setMessageSelector(MessageSelector selector) { + this.selector = selector; } public void addInterceptor(EndpointInterceptor interceptor) { @@ -166,10 +190,6 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware } } - public List getInterceptors() { - return this.interceptors; - } - public String toString() { return (this.name != null) ? this.name : super.toString(); } @@ -177,39 +197,31 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware protected void initialize() { } - public boolean isRunning() { - return this.running; - } - public void afterPropertiesSet() { - if (this.autoStartup) { - this.start(); + if (this.source != null && this.poller == null) { + this.poller = new DefaultEndpointPoller(); } - else { - this.initialize(); + if (this.target == null) { + this.target = this.getOutputChannel(); + } + if (this.target != null && this.target instanceof ChannelRegistryAware + && this.channelRegistry != null) { + ((ChannelRegistryAware) this.target).setChannelRegistry(this.channelRegistry); } - } - - public void start() { this.initialize(); - synchronized (this.lifecycleMonitor) { - if (this.running) { - return; - } - this.running = true; - } - } - - public void stop() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - return; - } - this.running = false; - } } public final boolean send(Message message) { + if (message == null || message.getPayload() == null) { + throw new IllegalArgumentException("Message and its payload must not be null."); + } + if (logger.isDebugEnabled()) { + logger.debug("endpoint '" + this + "' handling message: " + message); + } + if (message.getPayload() instanceof EndpointVisitor) { + ((EndpointVisitor) message.getPayload()).visitEndpoint(this); + return true; + } return this.send(message, 0); } @@ -239,27 +251,46 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware } private boolean doSend(Message message) { - if (message == null || message.getPayload() == null) { - throw new IllegalArgumentException("Message and its payload must not be null."); - } - if (logger.isDebugEnabled()) { - logger.debug("endpoint '" + this + "' handling message: " + message); - } - if (!this.isRunning()) { - throw new MessageHandlerNotRunningException(message); - } - if (message.getPayload() instanceof EndpointVisitor) { - ((EndpointVisitor) message.getPayload()).visitEndpoint(this); - return true; - } if (!this.supports(message)) { throw new MessageRejectedException(message, "unsupported message"); } - return this.handleMessage(message); + Message result = this.handleMessage(message); + if (result != null) { + return this.targetInvoker.invoke(this.target, result, this.sendTimeout); + } + return true; } - protected abstract boolean supports(Message message); + public final boolean poll() { + if (this.poller == null) { + this.afterPropertiesSet(); + if (this.poller == null) { + throw new MessagingException("endpoint '" + this + "' has no poller"); + } + } + if (this.source == null) { + throw new MessagingException("endpoint '" + this + "' has no source"); + } + int result = this.poller.poll(this.source, new MessageTarget() { + public boolean send(Message message) { + return AbstractEndpoint.this.send(message, 0); + } + }); + return (result > 0); + } - protected abstract boolean handleMessage(Message message); + protected boolean supports(Message message) { + if (this.selector != null && !this.selector.accept(message)) { + if (logger.isDebugEnabled()) { + logger.debug("selector for endpoint '" + this + "' rejected message: " + message); + } + return false; + } + return true; + } + + protected Message handleMessage(Message message) { + return message; + } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/HandlerEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/HandlerEndpoint.java index 09d298ca68..5a1e877def 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/HandlerEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/HandlerEndpoint.java @@ -25,7 +25,6 @@ import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.message.MessageHeader; -import org.springframework.integration.message.MessageTarget; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -35,7 +34,7 @@ import org.springframework.util.StringUtils; * * @author Mark Fisher */ -public class HandlerEndpoint extends TargetEndpoint { +public class HandlerEndpoint extends AbstractEndpoint { private volatile MessageHandler handler; @@ -80,7 +79,6 @@ public class HandlerEndpoint extends TargetEndpoint { if (this.handler instanceof ChannelRegistryAware) { ((ChannelRegistryAware) this.handler).setChannelRegistry(this.getChannelRegistry()); } - super.setTarget(new HandlerInvokingTarget(this.handler, this.replyHandler)); super.initialize(); } @@ -118,31 +116,16 @@ public class HandlerEndpoint extends TargetEndpoint { return null; } - - private static class HandlerInvokingTarget implements MessageTarget { - - private final MessageHandler handler; - - private final ReplyHandler replyHandler; - - - public HandlerInvokingTarget(MessageHandler handler, ReplyHandler replyHandler) { - this.handler = handler; - this.replyHandler = replyHandler; - } - - - public boolean send(Message message) { - Message replyMessage = this.handler.handle(message); - if (replyMessage != null) { - if (replyMessage.getHeader().getCorrelationId() == null) { - replyMessage.getHeader().setCorrelationId(message.getId()); - } - this.replyHandler.handle(replyMessage, message.getHeader()); + @Override + protected Message handleMessage(Message message) { + Message replyMessage = this.handler.handle(message); + if (replyMessage != null) { + if (replyMessage.getHeader().getCorrelationId() == null) { + replyMessage.getHeader().setCorrelationId(message.getId()); } - return true; + this.replyHandler.handle(replyMessage, message.getHeader()); } - + return null; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java index 846502d0f9..8f6ab4714e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageEndpoint.java @@ -19,6 +19,7 @@ package org.springframework.integration.endpoint; import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.scheduling.Schedule; @@ -35,6 +36,10 @@ public interface MessageEndpoint extends MessageTarget, ChannelRegistryAware, In Schedule getSchedule(); + void setSource(MessageSource source); + + void setTarget(MessageTarget target); + String getInputChannelName(); MessageChannel getInputChannel(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java index d7a488646a..00e5d9057c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourceEndpoint.java @@ -18,10 +18,6 @@ package org.springframework.integration.endpoint; import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.message.BlockingSource; -import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageDeliveryAware; -import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageSource; import org.springframework.util.Assert; @@ -33,27 +29,12 @@ import org.springframework.util.Assert; */ public class SourceEndpoint extends AbstractEndpoint { - private final MessageSource source; - - private volatile long receiveTimeout = 5000; - - private volatile long sendTimeout = -1; - - public SourceEndpoint(MessageSource source) { Assert.notNull(source, "source must not be null"); - this.source = source; + this.setSource(source); } - public void setReceiveTimeout(long receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public void setSendTimeout(long sendTimeout) { - this.sendTimeout = sendTimeout; - } - @Override public void initialize() { if (this.getOutputChannelName() == null && this.getOutputChannel() == null) { @@ -62,32 +43,4 @@ public class SourceEndpoint extends AbstractEndpoint { } } - @Override - protected final boolean supports(Message message) { - return false; - } - - @Override - protected final boolean handleMessage(Message message) { - return false; - } - - public boolean poll() { - Message message = (this.source instanceof BlockingSource && this.receiveTimeout >= 0) ? - ((BlockingSource) this.source).receive(this.receiveTimeout) : this.source.receive(); - if (message == null) { - return false; - } - boolean sent = this.getOutputChannel().send(message, this.sendTimeout); - if (this.source instanceof MessageDeliveryAware) { - if (sent) { - ((MessageDeliveryAware) this.source).onSend(message); - } - else { - ((MessageDeliveryAware) this.source).onFailure(new MessageDeliveryException(message, "failed to send message")); - } - } - return sent; - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java index 4b52d20db1..f730584d43 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/TargetEndpoint.java @@ -16,12 +16,7 @@ package org.springframework.integration.endpoint; -import org.springframework.integration.channel.ChannelRegistryAware; -import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.message.BlockingTarget; -import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageTarget; -import org.springframework.integration.message.selector.MessageSelector; import org.springframework.util.Assert; /** @@ -31,90 +26,12 @@ import org.springframework.util.Assert; */ public class TargetEndpoint extends AbstractEndpoint { - private volatile MessageTarget target; - - private volatile MessageSelector selector; - - private volatile long receiveTimeout = 5000; - - private volatile long sendTimeout = 0; - - private volatile boolean initialized; - - private final Object initializationMonitor = new Object(); - - public TargetEndpoint() { } public TargetEndpoint(MessageTarget target) { Assert.notNull(target, "target must not be null"); - this.target = target; - } - - - public MessageTarget getTarget() { - return this.target; - } - - public void setTarget(MessageTarget target) { - Assert.notNull(target, "target must not be null"); - this.target = target; - } - - public void setMessageSelector(MessageSelector selector) { - this.selector = selector; - } - - public void setReceiveTimeout(long receiveTimeout) { - this.receiveTimeout = receiveTimeout; - } - - public void setSendTimeout(long sendTimeout) { - this.sendTimeout = sendTimeout; - } - - protected void initialize() { - synchronized (this.initializationMonitor) { - if (this.initialized) { - return; - } - if (this.target instanceof ChannelRegistryAware && this.getChannelRegistry() != null) { - ((ChannelRegistryAware) this.target).setChannelRegistry(this.getChannelRegistry()); - } - this.initialized = true; - } - } - - @Override - protected boolean supports(Message message) { - if (this.selector != null && !this.selector.accept(message)) { - if (logger.isDebugEnabled()) { - logger.debug("selector for endpoint '" + this + "' rejected message: " + message); - } - return false; - } - return true; - } - - @Override - protected final boolean handleMessage(Message message) { - return (this.sendTimeout >= 0 && this.target instanceof BlockingTarget) ? - ((BlockingTarget) this.target).send(message) : this.target.send(message); - } - - public final boolean poll() { - MessageChannel channel = this.getInputChannel(); - if (channel != null) { - Message receivedMessage = channel.receive(this.receiveTimeout); - if (receivedMessage != null) { - return this.handleMessage(receivedMessage); - } - } - else if (logger.isDebugEnabled()) { - logger.debug("TargetEndpoint unable to resolve channel '" + this.getInputChannelName() + "'"); - } - return false; + this.setTarget(target); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java index fc1907e20c..9e8d009c2c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/RequestReplyTemplate.java @@ -213,7 +213,7 @@ public class RequestReplyTemplate implements MessageBusAware { } ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(10); HandlerEndpoint endpoint = new HandlerEndpoint(correlator); - endpoint.setInputChannel(this.replyChannel); + endpoint.setSource(this.replyChannel); endpoint.setName("internal.correlator." + this); this.endpointRegistry.registerEndpoint(endpoint); this.replyMessageCorrelator = correlator; @@ -249,11 +249,11 @@ public class RequestReplyTemplate implements MessageBusAware { public void setName(String name) { } - public Message receive() { + public Message receive() { return null; } - public Message receive(long timeout) { + public Message receive(long timeout) { return null; } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 2becec2dd9..9dbf28bb68 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -189,7 +189,7 @@ public class DefaultMessageBusTests { MessageBus bus = new DefaultMessageBus(); CountDownLatch latch = new CountDownLatch(1); SourceEndpoint sourceEndpoint = new SourceEndpoint(new FailingSource(latch)); - sourceEndpoint.setOutputChannel(new QueueChannel()); + sourceEndpoint.setTarget(new QueueChannel()); sourceEndpoint.setSchedule(new PollingSchedule(1000)); sourceEndpoint.setName("testEndpoint"); bus.registerEndpoint(sourceEndpoint); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml index 6ce111ef9d..cf33ea8556 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml @@ -12,7 +12,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java index 6369014852..33f57531ad 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java @@ -18,11 +18,14 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; +import java.util.List; + import org.junit.Test; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.MessageChannel; -import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.endpoint.EndpointPoller; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.SourceEndpoint; @@ -83,12 +86,15 @@ public class EndpointInterceptorTests { } + @SuppressWarnings("unchecked") private static void testInterceptors(MessageEndpoint endpoint, ClassPathXmlApplicationContext context, boolean innerBeans) { TestPreSendInterceptor preInterceptor = null; TestAroundSendEndpointInterceptor aroundInterceptor = null; if (innerBeans) { - preInterceptor = (TestPreSendInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(0); - aroundInterceptor = (TestAroundSendEndpointInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(1); + DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + List interceptors = (List) accessor.getPropertyValue("interceptors"); + preInterceptor = (TestPreSendInterceptor) interceptors.get(0); + aroundInterceptor = (TestAroundSendEndpointInterceptor) interceptors.get(1); } else { preInterceptor = (TestPreSendInterceptor) context.getBean("preInterceptor"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index b0a01e9e90..e9629b6757 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -25,6 +25,7 @@ import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -175,6 +176,7 @@ public class MessagingAnnotationPostProcessorTests { } @Test + @SuppressWarnings("unchecked") public void testConcurrencyAnnotationWithValues() { MessageBus messageBus = new DefaultMessageBus(); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(messageBus); @@ -182,9 +184,10 @@ public class MessagingAnnotationPostProcessorTests { ConcurrencyAnnotationTestBean testBean = new ConcurrencyAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); - assertEquals(1, endpoint.getInterceptors().size()); - EndpointInterceptor interceptor = endpoint.getInterceptors().get(0); - DirectFieldAccessor accessor = new DirectFieldAccessor(interceptor); + DirectFieldAccessor accessor = new DirectFieldAccessor(endpoint); + List interceptors = (List) accessor.getPropertyValue("interceptors"); + assertEquals(1, interceptors.size()); + EndpointInterceptor interceptor = interceptors.get(0); accessor = new DirectFieldAccessor(interceptor); ConcurrentTaskExecutor cte = (ConcurrentTaskExecutor) accessor.getPropertyValue("executor"); ThreadPoolExecutor executor = (ThreadPoolExecutor) cte.getConcurrentExecutor(); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index aa13cac0b3..c374817bf0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -52,9 +52,7 @@ public class BroadcastingDispatcherTests { private static MessageTarget createEndpoint(MessageHandler handler) { - HandlerEndpoint endpoint = new HandlerEndpoint(handler); - endpoint.start(); - return endpoint; + return new HandlerEndpoint(handler); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java index 4207be209f..1d43cd7956 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/dispatcher/SimpleDispatcherTests.java @@ -61,9 +61,7 @@ public class SimpleDispatcherTests { private static MessageTarget createEndpoint(MessageHandler handler) { - HandlerEndpoint endpoint = new HandlerEndpoint(handler); - endpoint.start(); - return endpoint; + return new HandlerEndpoint(handler); } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java index 702664ec9e..ac5e62307e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/HandlerEndpointTests.java @@ -33,7 +33,6 @@ import org.springframework.integration.channel.DefaultChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.handler.MessageHandler; -import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.handler.TestHandlers; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageRejectedException; @@ -59,9 +58,7 @@ public class HandlerEndpointTests { HandlerEndpoint endpoint = new HandlerEndpoint(handler); endpoint.setChannelRegistry(channelRegistry); endpoint.setOutputChannelName("replyChannel"); - endpoint.start(); endpoint.send(new StringMessage(1, "test")); - endpoint.stop(); Message reply = replyChannel.receive(50); assertNotNull(reply); assertEquals("hello test", reply.getPayload()); @@ -76,11 +73,9 @@ public class HandlerEndpointTests { } }; HandlerEndpoint endpoint = new HandlerEndpoint(handler); - endpoint.start(); StringMessage testMessage = new StringMessage(1, "test"); testMessage.getHeader().setReturnAddress(replyChannel); endpoint.send(testMessage); - endpoint.stop(); Message reply = replyChannel.receive(50); assertNotNull(reply); assertEquals("hello test", reply.getPayload()); @@ -98,11 +93,9 @@ public class HandlerEndpointTests { }; HandlerEndpoint endpoint = new HandlerEndpoint(handler); endpoint.setChannelRegistry(channelRegistry); - endpoint.start(); StringMessage testMessage = new StringMessage(1, "test"); testMessage.getHeader().setReturnAddress("replyChannel"); endpoint.send(testMessage); - endpoint.stop(); Message reply = replyChannel.receive(50); assertNotNull(reply); assertEquals("hello test", reply.getPayload()); @@ -121,7 +114,6 @@ public class HandlerEndpointTests { }; HandlerEndpoint endpoint = new HandlerEndpoint(handler); endpoint.setChannelRegistry(channelRegistry); - endpoint.start(); StringMessage testMessage = new StringMessage("test"); testMessage.getHeader().setReturnAddress(replyChannel1); endpoint.send(testMessage); @@ -137,7 +129,6 @@ public class HandlerEndpointTests { reply2 = replyChannel2.receive(0); assertNotNull(reply2); assertEquals("hello test", reply2.getPayload()); - endpoint.stop(); } @Test @@ -155,39 +146,13 @@ public class HandlerEndpointTests { HandlerEndpoint endpoint = new HandlerEndpoint(handler); endpoint.setChannelRegistry(channelRegistry); endpoint.setOutputChannelName("replyChannel"); - endpoint.start(); endpoint.send(new StringMessage(1, "test")); - endpoint.stop(); latch.await(500, TimeUnit.MILLISECONDS); assertEquals("handler should have been invoked within allotted time", 0, latch.getCount()); Message reply = replyChannel.receive(0); assertNull(reply); } - @Test(expected=MessageHandlerNotRunningException.class) - public void testEndpointDoesNotHandleMessagesWhenNotYetStarted() { - HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler()); - endpoint.send(new StringMessage("test")); - } - - @Test - public void testEndpointDoesNotHandleMessagesAfterBeingStopped() { - AtomicInteger counter = new AtomicInteger(); - HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.countingHandler(counter)); - boolean exceptionThrown = false; - try { - endpoint.start(); - endpoint.send(new StringMessage("test1")); - endpoint.stop(); - endpoint.send(new StringMessage("test2")); - } - catch (MessageHandlerNotRunningException e) { - exceptionThrown = true; - } - assertEquals("handler should have been invoked exactly once", 1, counter.get()); - assertTrue(exceptionThrown); - } - @Test(expected=MessageRejectedException.class) public void testEndpointWithSelectorRejecting() { HandlerEndpoint endpoint = new HandlerEndpoint(TestHandlers.nullHandler()); @@ -196,7 +161,6 @@ public class HandlerEndpointTests { return false; } }); - endpoint.start(); endpoint.send(new StringMessage("test")); } @@ -209,11 +173,9 @@ public class HandlerEndpointTests { return true; } }); - endpoint.start(); endpoint.send(new StringMessage("test")); latch.await(100, TimeUnit.MILLISECONDS); assertEquals("handler should have been invoked", 0, latch.getCount()); - endpoint.stop(); } @Test @@ -234,7 +196,6 @@ public class HandlerEndpointTests { } }); endpoint.setMessageSelector(selectorChain); - endpoint.start(); boolean exceptionWasThrown = false; try { endpoint.send(new StringMessage("test")); @@ -244,7 +205,6 @@ public class HandlerEndpointTests { } assertTrue(exceptionWasThrown); assertEquals("only the first selector should have been invoked", 1, counter.get()); - endpoint.stop(); } @Test @@ -266,7 +226,6 @@ public class HandlerEndpointTests { } }); endpoint.setMessageSelector(selectorChain); - endpoint.start(); boolean exceptionWasThrown = false; try { endpoint.send(new StringMessage("test")); @@ -277,7 +236,6 @@ public class HandlerEndpointTests { assertTrue(exceptionWasThrown); assertEquals("both selectors should have been invoked", 2, selectorCounter.get()); assertEquals("the handler should not have been invoked", 0, handlerCounter.get()); - endpoint.stop(); } @Test @@ -298,10 +256,8 @@ public class HandlerEndpointTests { } }); endpoint.setMessageSelector(selectorChain); - endpoint.start(); assertTrue(endpoint.send(new StringMessage("test"))); assertEquals("both selectors and handler should have been invoked", 3, counter.get()); - endpoint.stop(); } @Test @@ -312,7 +268,6 @@ public class HandlerEndpointTests { return message; } }); - endpoint.start(); Message message = new StringMessage("test"); message.getHeader().setReturnAddress(replyChannel); endpoint.send(message); @@ -329,7 +284,6 @@ public class HandlerEndpointTests { return message; } }); - endpoint.start(); Message message = new StringMessage("test"); message.getHeader().setReturnAddress(replyChannel); endpoint.send(message); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java index 57794ddb9d..c22bc4c150 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/SourceEndpointTests.java @@ -26,7 +26,6 @@ import org.junit.Test; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.message.GenericMessage; import org.springframework.integration.message.Message; -import org.springframework.integration.message.MessageHandlingException; import org.springframework.integration.message.MessageSource; /** @@ -39,7 +38,7 @@ public class SourceEndpointTests { TestSource source = new TestSource("testing", 1); QueueChannel channel = new QueueChannel(); SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); + endpoint.setTarget(channel); endpoint.afterPropertiesSet(); endpoint.send(new GenericMessage(new EndpointPoller())); Message message = channel.receive(1000); @@ -47,16 +46,6 @@ public class SourceEndpointTests { assertEquals("testing.1", message.getPayload()); } - @Test(expected = MessageHandlingException.class) - public void testAutoStartupDisabled() { - TestSource source = new TestSource("testing", 1); - QueueChannel channel = new QueueChannel(); - SourceEndpoint endpoint = new SourceEndpoint(source); - endpoint.setOutputChannel(channel); - endpoint.setAutoStartup(false); - endpoint.afterPropertiesSet(); - endpoint.send(new GenericMessage(new EndpointPoller())); - } private static class TestSource implements MessageSource { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml index 868ad5456f..24cb2973d0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/handler/adapterTests.xml @@ -19,7 +19,7 @@ - + @@ -34,7 +34,7 @@ - +