Added AbstractMessageConsumingEndpoint. MessageDispatchers now expect MessageConsumer instances as subscribers, and the MessageEndpoint no longer has a send() method or a getSource() method. All consumer endpoints now use 'inputChannel' as the property (instead of source). The MessageBus is less involved in endpoint activation now, since endpoints that need to poll a channel can create, configure, and schedule their own poller.

This commit is contained in:
Mark Fisher
2008-09-07 21:04:50 +00:00
parent 7a92660993
commit 35e744e60a
57 changed files with 758 additions and 670 deletions

View File

@@ -23,7 +23,7 @@ import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessagingException;
@@ -32,7 +32,7 @@ import org.springframework.integration.message.MessagingException;
*
* @author Mark Fisher
*/
public class ByteStreamTarget extends AbstractEndpoint {
public class ByteStreamTarget extends AbstractMessageConsumingEndpoint {
private final Log logger = LogFactory.getLog(this.getClass());
@@ -54,13 +54,13 @@ public class ByteStreamTarget extends AbstractEndpoint {
@Override
public boolean sendInternal(Message message) {
public void processMessage(Message<?> message) {
Object payload = message.getPayload();
if (payload == null) {
if (logger.isWarnEnabled()) {
logger.warn(this.getClass().getSimpleName() + " received null object");
}
return false;
return;
}
try {
if (payload instanceof String) {
@@ -74,7 +74,6 @@ public class ByteStreamTarget extends AbstractEndpoint {
" only supports byte array and String-based messages");
}
this.stream.flush();
return true;
}
catch (IOException e) {
throw new MessagingException("IO failure occurred in target", e);

View File

@@ -27,7 +27,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessagingException;
import org.springframework.util.Assert;
@@ -41,7 +41,7 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
public class CharacterStreamTarget extends AbstractEndpoint {
public class CharacterStreamTarget extends AbstractMessageConsumingEndpoint {
private final Log logger = LogFactory.getLog(this.getClass());
@@ -118,13 +118,13 @@ public class CharacterStreamTarget extends AbstractEndpoint {
}
@Override
public boolean sendInternal(Message message) {
public void processMessage(Message<?> message) {
Object payload = message.getPayload();
if (payload == null) {
if (logger.isWarnEnabled()) {
logger.warn("target received null payload");
}
return false;
return;
}
try {
if (payload instanceof String) {
@@ -143,7 +143,6 @@ public class CharacterStreamTarget extends AbstractEndpoint {
writer.newLine();
}
writer.flush();
return true;
}
catch (IOException e) {
throw new MessagingException("IO failure occurred in target", e);

View File

@@ -72,10 +72,10 @@ public class ConsoleTargetParser extends AbstractSingleBeanDefinitionParser {
}
String channelName = element.getAttribute("channel");
if (StringUtils.hasText(channelName)) {
builder.addPropertyReference("source", channelName);
builder.addPropertyReference("inputChannel", channelName);
}
else {
builder.addPropertyReference("source", this.createDirectChannel(element, parserContext));
builder.addPropertyReference("inputChannel", this.createDirectChannel(element, parserContext));
}
}

View File

@@ -51,7 +51,7 @@ public class ByteStreamTargetTests {
public void testSingleByteArray() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamTarget target = new ByteStreamTarget(stream);
target.send(new GenericMessage<byte[]>(new byte[] {1,2,3}));
target.onMessage(new GenericMessage<byte[]>(new byte[] {1,2,3}));
byte[] result = stream.toByteArray();
assertEquals(3, result.length);
assertEquals(1, result[0]);
@@ -63,7 +63,7 @@ public class ByteStreamTargetTests {
public void testSingleString() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamTarget target = new ByteStreamTarget(stream);
target.send(new StringMessage("foo"));
target.onMessage(new StringMessage("foo"));
byte[] result = stream.toByteArray();
assertEquals(3, result.length);
assertEquals("foo", new String(result));

View File

@@ -50,7 +50,7 @@ public class CharacterStreamTargetTests {
public void testSingleString() {
StringWriter writer = new StringWriter();
CharacterStreamTarget target = new CharacterStreamTarget(writer);
target.send(new StringMessage("foo"));
target.onMessage(new StringMessage("foo"));
assertEquals("foo", writer.toString());
}

View File

@@ -72,7 +72,7 @@ public class ConsoleTargetParserTests {
Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding());
assertEquals(Charset.defaultCharset(), writerCharset);
this.resetStreams();
target.send(new StringMessage("foo"));
target.onMessage(new StringMessage("foo"));
assertEquals("foo", out.toString());
assertEquals("", err.toString());
}
@@ -92,7 +92,7 @@ public class ConsoleTargetParserTests {
Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding());
assertEquals(Charset.forName("UTF-8"), writerCharset);
this.resetStreams();
target.send(new StringMessage("bar"));
target.onMessage(new StringMessage("bar"));
assertEquals("bar", out.toString());
assertEquals("", err.toString());
}
@@ -128,7 +128,7 @@ public class ConsoleTargetParserTests {
Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding());
assertEquals(Charset.defaultCharset(), writerCharset);
this.resetStreams();
target.send(new StringMessage("bad"));
target.onMessage(new StringMessage("bad"));
assertEquals("", out.toString());
assertEquals("bad", err.toString());
}
@@ -148,7 +148,7 @@ public class ConsoleTargetParserTests {
Charset writerCharset = Charset.forName(((OutputStreamWriter) writer).getEncoding());
assertEquals(Charset.defaultCharset(), writerCharset);
this.resetStreams();
target.send(new StringMessage("foo"));
target.onMessage(new StringMessage("foo"));
assertEquals("foo\n", out.toString());
}

View File

@@ -57,7 +57,7 @@ public class WebServiceHandlerParser extends AbstractSingleBeanDefinitionParser
}
builder.addConstructorArgValue(uri);
String inputChannel = element.getAttribute("input-channel");
builder.addPropertyReference("source", inputChannel);
builder.addPropertyReference("inputChannel", inputChannel);
String outputChannel = element.getAttribute("output-channel");
if (StringUtils.hasText(outputChannel)) {
builder.addPropertyReference("outputChannel", outputChannel);

View File

@@ -151,7 +151,8 @@ public abstract class AbstractMessageBarrierEndpoint extends AbstractInOutEndpoi
* Initialize this endpoint.
*/
@Override
protected void initialize() {
protected void initialize() throws Exception {
super.initialize();
this.trackedCorrelationIds = new ArrayBlockingQueue<Object>(this.trackedCorrelationIdCapacity);
this.executor.scheduleWithFixedDelay(new ReaperTask(),
this.reaperInterval, this.reaperInterval, TimeUnit.MILLISECONDS);

View File

@@ -44,18 +44,12 @@ import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.DefaultChannelRegistry;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.AbstractPoller;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.DefaultEndpointRegistry;
import org.springframework.integration.endpoint.EndpointRegistry;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.MessagingGateway;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.integration.scheduling.TaskSchedulerAware;
import org.springframework.integration.scheduling.spi.ProviderTaskScheduler;
import org.springframework.integration.scheduling.spi.SimpleScheduleServiceProvider;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
@@ -78,14 +72,10 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
private final EndpointRegistry endpointRegistry = new DefaultEndpointRegistry();
private final Set<AbstractPoller> pollers = new CopyOnWriteArraySet<AbstractPoller>();
private volatile Schedule defaultPollerSchedule = new PollingSchedule(0);
private final List<Lifecycle> lifecycleEndpoints = new CopyOnWriteArrayList<Lifecycle>();
private final MessageBusInterceptorsList interceptors = new MessageBusInterceptorsList();
private final Set<Lifecycle> lifecycleGateways = new CopyOnWriteArraySet<Lifecycle>();
private volatile TaskScheduler taskScheduler;
private volatile ApplicationContext applicationContext;
@@ -263,39 +253,46 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
}
}
private void deactivateEndpoints() {
Set<String> endpointNames = this.endpointRegistry.getEndpointNames();
for (String name : endpointNames) {
MessageEndpoint endpoint = this.endpointRegistry.lookupEndpoint(name);
if (endpoint != null) {
this.deactivateEndpoint(endpoint);
}
}
}
private void activateEndpoint(MessageEndpoint endpoint) {
Assert.notNull(endpoint, "'endpoint' must not be null");
if (endpoint instanceof ChannelRegistryAware) {
((ChannelRegistryAware) endpoint).setChannelRegistry(this);
}
MessageSource<?> source = endpoint.getSource();
if (source == null) {
throw new ConfigurationException("endpoint '" + endpoint + "' has no source");
if (endpoint instanceof TaskSchedulerAware) {
((TaskSchedulerAware) endpoint).setTaskScheduler(this.taskScheduler);
}
if (source instanceof SubscribableSource) {
((SubscribableSource) source).subscribe(endpoint);
if (source instanceof AbstractPoller) {
AbstractPoller poller = (AbstractPoller) source;
this.pollers.add(poller);
this.taskScheduler.schedule(poller);
}
return;
}
else if (source instanceof PollableChannel) {
ChannelPoller poller = new ChannelPoller((PollableChannel) source, this.defaultPollerSchedule);
poller.subscribe(endpoint);
this.pollers.add(poller);
this.taskScheduler.schedule(poller);
if (endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).start();
}
if (logger.isInfoEnabled()) {
logger.info("activated subscription to channel '"
+ source + "' for endpoint '" + endpoint + "'");
logger.info("activated endpoint '" + endpoint + "'");
}
}
public void deactivateEndpoint(MessageEndpoint endpoint) {
Assert.notNull(endpoint, "'endpoint' must not be null");
if (endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).stop();
if (this.logger.isInfoEnabled()) {
logger.info("deactivated endpoint '" + endpoint + "'");
}
}
}
// TODO: once gateways are endpoints, remove this
private void registerGateway(String name, MessagingGateway gateway) {
if (gateway instanceof Lifecycle) {
this.lifecycleEndpoints.add((Lifecycle) gateway);
this.lifecycleGateways.add((Lifecycle) gateway);
if (this.isRunning()) {
((Lifecycle) gateway).start();
}
@@ -305,19 +302,6 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
}
}
public void deactivateEndpoint(MessageEndpoint endpoint) {
Assert.notNull(endpoint, "'endpoint' must not be null");
for (AbstractPoller poller : this.pollers) {
boolean removed = ((AbstractPoller) poller).unsubscribe(endpoint);
if (removed && this.logger.isInfoEnabled()) {
logger.info("unsubscribed endpoint '" + endpoint + "' from poller '" + poller + "'");
}
}
if (endpoint instanceof Lifecycle) {
((Lifecycle) endpoint).stop();
}
}
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.running;
@@ -335,14 +319,11 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
this.starting = true;
synchronized (this.lifecycleMonitor) {
this.activateEndpoints();
for (Lifecycle gateway : this.lifecycleGateways) {
gateway.start();
}
this.taskScheduler.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel()));
this.taskScheduler.start();
for (Lifecycle endpoint : this.lifecycleEndpoints) {
endpoint.start();
if (logger.isInfoEnabled()) {
logger.info("started endpoint '" + endpoint + "'");
}
}
}
this.running = true;
this.starting = false;
@@ -358,14 +339,12 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
}
this.interceptors.preStop();
synchronized (this.lifecycleMonitor) {
this.deactivateEndpoints();
for (Lifecycle gateway : this.lifecycleGateways) {
gateway.stop();
}
this.running = false;
this.taskScheduler.stop();
for (Lifecycle endpoint : this.lifecycleEndpoints) {
endpoint.stop();
if (logger.isInfoEnabled()) {
logger.info("stopped endpoint '" + endpoint + "'");
}
}
}
this.interceptors.postStop();
if (logger.isInfoEnabled()) {

View File

@@ -19,6 +19,7 @@ package org.springframework.integration.channel;
import org.springframework.integration.dispatcher.SimpleDispatcher;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.SubscribableSource;
/**
@@ -33,12 +34,12 @@ public class DirectChannel extends AbstractMessageChannel implements Subscribabl
private final SimpleDispatcher dispatcher = new SimpleDispatcher();
public boolean subscribe(MessageEndpoint endpoint) {
return this.dispatcher.subscribe(endpoint);
public boolean subscribe(MessageConsumer consumer) {
return this.dispatcher.subscribe(consumer);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.dispatcher.unsubscribe(endpoint);
public boolean unsubscribe(MessageConsumer consumer) {
return this.dispatcher.unsubscribe(consumer);
}
@Override

View File

@@ -18,8 +18,8 @@ package org.springframework.integration.channel;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.SubscribableSource;
/**
@@ -48,12 +48,12 @@ public class PublishSubscribeChannel extends AbstractMessageChannel implements S
this.dispatcher.setApplySequence(applySequence);
}
public boolean subscribe(MessageEndpoint endpoint) {
return this.dispatcher.subscribe(endpoint);
public boolean subscribe(MessageConsumer consumer) {
return this.dispatcher.subscribe(consumer);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.dispatcher.unsubscribe(endpoint);
public boolean unsubscribe(MessageConsumer consumer) {
return this.dispatcher.unsubscribe(consumer);
}
@Override

View File

@@ -89,12 +89,13 @@ public abstract class AbstractEndpointParser extends AbstractSingleBeanDefinitio
}
Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT);
if (pollerElement != null) {
String pollerBeanName = IntegrationNamespaceUtils.parseChannelPoller(inputChannel, pollerElement, parserContext);
builder.addPropertyReference("source", pollerBeanName);
}
else {
builder.addPropertyReference("source", inputChannel);
IntegrationNamespaceUtils.configureSchedule(pollerElement, builder);
Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional");
if (txElement != null) {
IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder);
}
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, SELECTOR_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, ERROR_HANDLER_ATTRIBUTE);

View File

@@ -73,19 +73,21 @@ public class ChannelAdapterParser extends AbstractBeanDefinitionParser {
source = BeanDefinitionReaderUtils.registerWithGeneratedName(invokerBuilder.getBeanDefinition(), parserContext.getRegistry());
}
adapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(InboundChannelAdapter.class);
if (pollerElement != null) {
String pollerBeanName = IntegrationNamespaceUtils.parseSourcePoller(source, pollerElement, parserContext);
adapterBuilder.addPropertyReference("source", pollerBeanName);
}
else {
adapterBuilder.addPropertyReference("source", source);
}
adapterBuilder.addPropertyReference("source", source);
if (StringUtils.hasText(channelName)) {
adapterBuilder.addPropertyReference("channel", channelName);
}
else {
adapterBuilder.addPropertyReference("channel", this.createDirectChannel(element, parserContext));
}
if (pollerElement != null) {
IntegrationNamespaceUtils.configureSchedule(pollerElement, adapterBuilder);
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, pollerElement, "max-messages-per-poll");
Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional");
if (txElement != null) {
IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder);
}
}
}
else if (StringUtils.hasText(target)) {
if (StringUtils.hasText(methodName)) {
@@ -100,14 +102,17 @@ public class ChannelAdapterParser extends AbstractBeanDefinitionParser {
if (!StringUtils.hasText(channelName)) {
throw new ConfigurationException("outbound channel-adapter with a 'poller' requires a 'channel' to poll");
}
String pollerBeanName = IntegrationNamespaceUtils.parseChannelPoller(channelName, pollerElement, parserContext);
adapterBuilder.addPropertyReference("source", pollerBeanName);
IntegrationNamespaceUtils.configureSchedule(pollerElement, adapterBuilder);
Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional");
if (txElement != null) {
IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder);
}
}
else if (StringUtils.hasText(channelName)) {
adapterBuilder.addPropertyReference("source", channelName);
if (StringUtils.hasText(channelName)) {
adapterBuilder.addPropertyReference("inputChannel", channelName);
}
else {
adapterBuilder.addPropertyReference("source",
adapterBuilder.addPropertyReference("inputChannel",
this.createDirectChannel(element, parserContext));
}
}

View File

@@ -21,19 +21,15 @@ import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.Conventions;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.SourcePoller;
import org.springframework.integration.scheduling.CronSchedule;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;
/**
* Shared utility methods for integration namespace parsers.
@@ -138,71 +134,54 @@ public abstract class IntegrationNamespaceUtils {
}
/**
* Parse a "poller" element to create a ChannelPoller and return the bean name of the poller instance.
* Parse a "poller" element to create a Schedule and add it to the property values of the target builder.
*
* @param channelBeanName the name of the PollableChannel bean
* @param element the "poller" element to parse
* @param parserContext the parserContext for registering a newly created bean definition
* @return the name of the ChannelPoller bean definition
* @param pollerElement the "poller" element to parse
* @param targetBuilder the builder that expects the "schedule" property
*/
public static String parseChannelPoller(String channelBeanName, Element element, ParserContext parserContext) {
return parsePoller(channelBeanName, element, parserContext, true);
}
/**
* Parse a "poller" element to create a SourcePoller and return the bean name of the poller instance.
*
* @param sourceBeanName the name of the PollableSource bean
* @param element the "poller" element to parse
* @param parserContext the parserContext for registering a newly created bean definition
* @return the name of the poller bean definition
*/
public static String parseSourcePoller(String sourceBeanName, Element element, ParserContext parserContext) {
return parsePoller(sourceBeanName, element, parserContext, false);
}
private static String parsePoller(String sourceBeanName, Element element, ParserContext parserContext, boolean isChannel) {
Class<?> beanClass = isChannel ? ChannelPoller.class : SourcePoller.class;
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClass);
public static void configureSchedule(Element pollerElement, BeanDefinitionBuilder targetBuilder) {
Schedule schedule = null;
if (!(StringUtils.hasText(element.getAttribute("period")) ^ StringUtils.hasText(element.getAttribute("cron")))) {
if (!(StringUtils.hasText(pollerElement.getAttribute("period")) ^ StringUtils.hasText(pollerElement.getAttribute("cron")))) {
throw new ConfigurationException("A <poller> element must define either a period "
+ "or a cron expression (but not both)");
}
if (StringUtils.hasText(element.getAttribute("period"))) {
Long period = Long.valueOf(element.getAttribute("period"));
if (StringUtils.hasText(pollerElement.getAttribute("period"))) {
Long period = Long.valueOf(pollerElement.getAttribute("period"));
schedule = new PollingSchedule(period);
String initialDelay = element.getAttribute("initial-delay");
String initialDelay = pollerElement.getAttribute("initial-delay");
if (StringUtils.hasText(initialDelay)) {
((PollingSchedule)schedule).setInitialDelay(Long.valueOf(initialDelay));
}
if ("true".equals(element.getAttribute("fixed-rate").toLowerCase())) {
if ("true".equals(pollerElement.getAttribute("fixed-rate").toLowerCase())) {
((PollingSchedule)schedule).setFixedRate(true);
}
else {
((PollingSchedule)schedule).setFixedRate(false);
}
}
if (StringUtils.hasText(element.getAttribute("cron"))) {
schedule = new CronSchedule(element.getAttribute("cron"));
if (StringUtils.hasText(pollerElement.getAttribute("cron"))) {
schedule = new CronSchedule(pollerElement.getAttribute("cron"));
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor");
Element txElement = DomUtils.getChildElementByTagName(element, "transactional");
if (txElement != null) {
builder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager"));
builder.addPropertyValue("propagationBehaviorName",
DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation"));
builder.addPropertyValue("isolationLevelName",
DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation"));
builder.addPropertyValue("transactionTimeout", txElement.getAttribute("timeout"));
builder.addPropertyValue("transactionReadOnly", txElement.getAttribute("read-only"));
}
builder.addConstructorArgReference(sourceBeanName);
builder.addConstructorArgValue(schedule);
setValueIfAttributeDefined(builder, element, "receive-timeout");
setValueIfAttributeDefined(builder, element, "send-timeout");
setValueIfAttributeDefined(builder, element, "max-messages-per-poll");
return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
targetBuilder.addPropertyValue("schedule", schedule);
}
/**
* Parse a "transactional" element and configure the "transactionManager" and "transactionDefinition"
* properties for the target builder.
*
* @param txElement the "transactional" element to parse
* @param targetBuilder the builder that expects the "transactionManager" and "transactionDefinition" properties
*/
public static void configureTransactionAttributes(Element txElement, BeanDefinitionBuilder targetBuilder) {
targetBuilder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager"));
DefaultTransactionDefinition txDefinition = new DefaultTransactionDefinition();
txDefinition.setPropagationBehaviorName(
DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation"));
txDefinition.setIsolationLevelName(
DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation"));
txDefinition.setTimeout(Integer.valueOf(txElement.getAttribute("timeout")));
txDefinition.setReadOnly(txElement.getAttribute("read-only").equalsIgnoreCase("true"));
targetBuilder.addPropertyValue("transactionDefinition", txDefinition);
}
}

View File

@@ -51,7 +51,7 @@ public class ResequencerParser extends AbstractSimpleBeanDefinitionParser {
@Override
protected void postProcess(BeanDefinitionBuilder builder, Element element) {
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE, "source");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, INPUT_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, OUTPUT_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, DISCARD_CHANNEL_ATTRIBUTE);
}

View File

@@ -28,7 +28,7 @@ import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
@@ -87,22 +87,22 @@ public abstract class AbstractMethodAnnotationPostProcessor<T extends Annotation
if (inputChannel == null) {
throw new ConfigurationException("unable to resolve inputChannel '" + inputChannelName + "'");
}
if (pollerAnnotation != null) {
if (inputChannel instanceof PollableChannel) {
PollingSchedule schedule = new PollingSchedule(pollerAnnotation.period());
schedule.setInitialDelay(pollerAnnotation.initialDelay());
schedule.setFixedRate(pollerAnnotation.fixedRate());
schedule.setTimeUnit(pollerAnnotation.timeUnit());
ChannelPoller poller = new ChannelPoller((PollableChannel) inputChannel, schedule);
poller.setMaxMessagesPerPoll(pollerAnnotation.maxMessagesPerPoll());
endpoint.setSource(poller);
if (endpoint instanceof AbstractMessageConsumingEndpoint) {
AbstractMessageConsumingEndpoint consumingEndpoint = (AbstractMessageConsumingEndpoint) endpoint;
if (pollerAnnotation != null) {
if (inputChannel instanceof PollableChannel) {
PollingSchedule schedule = new PollingSchedule(pollerAnnotation.period());
schedule.setInitialDelay(pollerAnnotation.initialDelay());
schedule.setFixedRate(pollerAnnotation.fixedRate());
schedule.setTimeUnit(pollerAnnotation.timeUnit());
consumingEndpoint.setSchedule(schedule);
consumingEndpoint.setMaxMessagesPerPoll(pollerAnnotation.maxMessagesPerPoll());
}
else {
throw new ConfigurationException("The @Poller annotation should only be provided for a PollableChannel");
}
}
else {
throw new ConfigurationException("The @Poller annotation should only be provided for a PollableSource");
}
}
else {
endpoint.setSource(inputChannel);
consumingEndpoint.setInputChannel(inputChannel);
}
if (endpoint instanceof AbstractInOutEndpoint) {
String outputChannelName = (String) AnnotationUtils.getValue(annotation, OUTPUT_CHANNEL_ATTRIBUTE);

View File

@@ -26,11 +26,9 @@ import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.InboundChannelAdapter;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.OutboundChannelAdapter;
import org.springframework.integration.endpoint.SourcePoller;
import org.springframework.integration.handler.MethodInvokingTarget;
import org.springframework.integration.message.MethodInvokingSource;
import org.springframework.integration.scheduling.PollingSchedule;
@@ -90,32 +88,22 @@ public class ChannelAdapterAnnotationPostProcessor implements MethodAnnotationPo
+ "when using the @ChannelAdapter annotation with a no-arg method.");
}
Schedule schedule = this.createSchedule(pollerAnnotation);
SourcePoller poller = new SourcePoller(source, schedule);
int maxMessagesPerPoll = pollerAnnotation.maxMessagesPerPoll();
if (maxMessagesPerPoll == -1) {
// the default is 1 since a MethodInvokingSource might return a non-null value
// every time it is invoked, thus producing an infinite number of messages per poll
maxMessagesPerPoll = 1;
}
poller.setMaxMessagesPerPoll(maxMessagesPerPoll);
InboundChannelAdapter adapter = new InboundChannelAdapter();
adapter.setSource(poller);
adapter.setSource(source);
adapter.setChannel(channel);
adapter.setSchedule(schedule);
adapter.setBeanName(this.generateUniqueName(channel.getName() + ".inboundAdapter"));
return adapter;
}
private OutboundChannelAdapter createOutboundChannelAdapter(MethodInvokingTarget target, MessageChannel channel, Poller pollerAnnotation) {
OutboundChannelAdapter adapter = new OutboundChannelAdapter(target);
adapter.setInputChannel(channel);
if (channel instanceof PollableChannel) {
Schedule schedule = (pollerAnnotation != null)
? this.createSchedule(pollerAnnotation)
: new PollingSchedule(0);
ChannelPoller poller = new ChannelPoller((PollableChannel) channel, schedule);
adapter.setSource(poller);
}
else {
adapter.setSource(channel);
adapter.setSchedule(schedule);
}
adapter.setBeanName(this.generateUniqueName(channel.getName() + ".outboundAdapter"));
return adapter;

View File

@@ -23,8 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
/**
* Base class for {@link MessageDispatcher} implementations.
@@ -35,21 +34,21 @@ public abstract class AbstractDispatcher implements MessageDispatcher {
protected final Log logger = LogFactory.getLog(this.getClass());
protected final Set<MessageEndpoint> endpoints = new CopyOnWriteArraySet<MessageEndpoint>();
protected final Set<MessageConsumer> subscribers = new CopyOnWriteArraySet<MessageConsumer>();
private volatile TaskExecutor taskExecutor;
public boolean subscribe(MessageEndpoint endpoint) {
return this.endpoints.add(endpoint);
public boolean subscribe(MessageConsumer consumer) {
return this.subscribers.add(consumer);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.endpoints.remove(endpoint);
public boolean unsubscribe(MessageConsumer consumer) {
return this.subscribers.remove(consumer);
}
/**
* Specify a {@link TaskExecutor} for invoking the endpoints.
* Specify a {@link TaskExecutor} for invoking the consumers.
* If none is provided, the invocation will occur in the thread
* that runs this polling dispatcher.
*/
@@ -61,15 +60,8 @@ public abstract class AbstractDispatcher implements MessageDispatcher {
return this.taskExecutor;
}
/**
* A convenience method for subclasses to send a Message to a single endpoint.
*/
protected final boolean sendMessageToEndpoint(Message<?> message, MessageEndpoint endpoint) {
return endpoint.send(message);
}
public String toString() {
return this.getClass().getSimpleName() + " with endpoints: " + this.endpoints;
return this.getClass().getSimpleName() + " with subscribers: " + this.subscribers;
}
}

View File

@@ -17,9 +17,9 @@
package org.springframework.integration.dispatcher;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageConsumer;
/**
* A broadcasting dispatcher implementation. It makes a best effort to
@@ -45,8 +45,8 @@ public class BroadcastingDispatcher extends AbstractDispatcher {
public boolean dispatch(Message<?> message) {
int sequenceNumber = 1;
int sequenceSize = this.endpoints.size();
for (final MessageEndpoint endpoint : this.endpoints) {
int sequenceSize = this.subscribers.size();
for (final MessageConsumer consumer : this.subscribers) {
final Message<?> messageToSend = (!this.applySequence) ? message
: MessageBuilder.fromMessage(message)
.setSequenceNumber(sequenceNumber++)
@@ -56,12 +56,12 @@ public class BroadcastingDispatcher extends AbstractDispatcher {
if (executor != null) {
executor.execute(new Runnable() {
public void run() {
sendMessageToEndpoint(messageToSend, endpoint);
consumer.onMessage(messageToSend);
}
});
}
else {
this.sendMessageToEndpoint(messageToSend, endpoint);
consumer.onMessage(messageToSend);
}
}
return true;

View File

@@ -16,50 +16,45 @@
package org.springframework.integration.dispatcher;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageRejectedException;
/**
* Basic implementation of {@link MessageDispatcher} that will attempt
* to send a {@link Message} to one of its endpoints. As soon as <em>one</em>
* of the endpoints accepts the Message, the dispatcher will return 'true'.
* to send a {@link Message} to one of its subscribers. As soon as <em>one</em>
* of the subscribers accepts the Message, the dispatcher will return 'true'.
* <p>
* If the dispatcher has no endpoints, a {@link MessageDeliveryException}
* will be thrown. If all endpoints reject the Message, the dispatcher will
* throw a MessageRejectedException. If all endpoints return 'false'
* (e.g. due to a timeout), the dispatcher will return 'false'.
* If the dispatcher has no subscribers, a {@link MessageDeliveryException}
* will be thrown. If all subscribers reject the Message, the dispatcher will
* throw a MessageRejectedException.
*
* @author Mark Fisher
*/
public class SimpleDispatcher extends AbstractDispatcher {
public boolean dispatch(Message<?> message) {
if (this.endpoints.size() == 0) {
if (this.subscribers.size() == 0) {
throw new MessageDeliveryException(message, "Dispatcher has no subscribers.");
}
int count = 0;
int rejectedExceptionCount = 0;
for (MessageEndpoint endpoint : this.endpoints) {
for (MessageConsumer consumer : this.subscribers) {
count++;
try {
if (this.sendMessageToEndpoint(message, endpoint)) {
return true;
}
if (logger.isDebugEnabled()) {
logger.debug("Failed to send message to endpoint, continuing with other endpoints if available.");
}
consumer.onMessage(message);
return true;
}
catch (MessageRejectedException e) {
rejectedExceptionCount++;
if (logger.isDebugEnabled()) {
logger.debug("Endpoint '" + endpoint + "' rejected Message, continuing with other endpoints if available.", e);
logger.debug("Consumer '" + consumer + "' rejected Message, continuing with other subscribers if available.", e);
}
}
}
if (rejectedExceptionCount == count) {
throw new MessageRejectedException(message, "All of dispatcher's endpoints rejected Message.");
throw new MessageRejectedException(message, "All of dispatcher's subscribers rejected Message.");
}
return false;
}

View File

@@ -24,31 +24,35 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageExchangeTemplate;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.integration.scheduling.TaskSchedulerAware;
import org.springframework.integration.util.ErrorHandler;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
/**
* The base class for Message Endpoint implementations.
*
* @author Mark Fisher
*/
public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegistryAware, BeanNameAware, InitializingBean {
public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegistryAware, TaskSchedulerAware, BeanNameAware, InitializingBean {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile String name;
private MessageSource<?> source;
private volatile ChannelRegistry channelRegistry;
private volatile TaskScheduler taskScheduler;
private volatile PlatformTransactionManager transactionManager;
private volatile TransactionDefinition transactionDefinition;
private volatile ErrorHandler errorHandler;
private volatile ChannelRegistry channelRegistry;
private final MessageExchangeTemplate messageExchangeTemplate = new MessageExchangeTemplate();
@@ -63,14 +67,6 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist
this.name = name;
}
public MessageSource<?> getSource() {
return this.source;
}
public void setSource(MessageSource<?> source) {
this.source = source;
}
protected ChannelRegistry getChannelRegistry() {
return this.channelRegistry;
}
@@ -79,6 +75,22 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist
this.channelRegistry = channelRegistry;
}
protected TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
public void setTaskScheduler(TaskScheduler taskScheduler) {
this.taskScheduler = taskScheduler;
}
public void setTransactionManager(PlatformTransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public void setTransactionDefinition(TransactionDefinition transactionDefinition) {
this.transactionDefinition= transactionDefinition;
}
protected MessageExchangeTemplate getMessageExchangeTemplate() {
return this.messageExchangeTemplate;
}
@@ -94,9 +106,6 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist
}
public final void afterPropertiesSet() {
if (this.source != null && (this.source instanceof SubscribableSource)) {
((SubscribableSource) this.source).subscribe(this);
}
try {
this.initialize();
}
@@ -114,31 +123,7 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist
protected void initialize() throws Exception {
}
public final boolean send(Message<?> message) {
if (message == null || message.getPayload() == null) {
throw new IllegalArgumentException("Message and its payload must not be null");
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("endpoint '" + this + "' processing message: " + message);
}
try {
return this.sendInternal(message);
}
catch (Exception e) {
if (e instanceof MessagingException) {
this.handleException((MessagingException) e);
}
else {
this.handleException(new MessageHandlingException(message,
"failure occurred in endpoint '" + this.toString() + "'", e));
}
return false;
}
}
protected abstract boolean sendInternal(Message<?> message);
private void handleException(MessagingException exception) {
protected void handleException(MessagingException exception) {
if (this.errorHandler == null) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("exception occurred in endpoint '" + this.name + "'", exception);
@@ -148,6 +133,18 @@ public abstract class AbstractEndpoint implements MessageEndpoint, ChannelRegist
this.errorHandler.handle(exception);
}
protected final void configureTransactionSettingsForPoller(AbstractPoller poller) {
if (this.transactionManager != null) {
poller.setTransactionManager(this.transactionManager);
}
if (this.transactionDefinition != null) {
poller.setPropagationBehavior(this.transactionDefinition.getPropagationBehavior());
poller.setIsolationLevel(this.transactionDefinition.getIsolationLevel());
poller.setTransactionReadOnly(this.transactionDefinition.isReadOnly());
poller.setTransactionTimeout(this.transactionDefinition.getTimeout());
}
}
public String toString() {
return (this.name != null) ? this.name : super.toString();
}

View File

@@ -34,7 +34,7 @@ import org.springframework.integration.message.selector.MessageSelector;
/**
* @author Mark Fisher
*/
public abstract class AbstractInOutEndpoint extends AbstractEndpoint {
public abstract class AbstractInOutEndpoint extends AbstractMessageConsumingEndpoint {
private MessageChannel outputChannel;
@@ -73,11 +73,11 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint {
}
@Override
protected boolean sendInternal(Message<?> message) {
protected void processMessage(Message<?> message) {
for (EndpointInterceptor interceptor : this.interceptors) {
message = interceptor.preHandle(message);
if (message == null) {
return false;
return;
}
}
if (!this.supports(message)) {
@@ -89,7 +89,7 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint {
throw new MessageHandlingException(message, "endpoint '" + this.getName()
+ " requires a reply, but no reply was received");
}
return true;
return;
}
Message<?> reply = null;
if (result instanceof Message && result.equals(message)) {
@@ -106,10 +106,9 @@ public abstract class AbstractInOutEndpoint extends AbstractEndpoint {
boolean sent = this.sendReplyMessage(nextReply, replyChannel);
sentAtLeastOne = (sentAtLeastOne || sent);
}
return sentAtLeastOne;
}
else {
return this.sendReplyMessage(reply, replyChannel);
this.sendReplyMessage(reply, replyChannel);
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.endpoint;
import org.springframework.context.Lifecycle;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageHandlingException;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
/**
* The base class for Message Endpoint implementations that consume Messages.
*
* @author Mark Fisher
*/
public abstract class AbstractMessageConsumingEndpoint extends AbstractEndpoint implements MessageConsumer, Lifecycle {
private volatile MessageChannel inputChannel;
private volatile Schedule schedule = new PollingSchedule(0);
private volatile ChannelPoller poller;
private volatile int maxMessagesPerPoll = -1;
private volatile boolean initialized;
private volatile boolean running;
private final Object lifecycleMonitor = new Object();
public void setInputChannel(MessageChannel inputChannel) {
this.inputChannel = inputChannel;
}
public void setSchedule(Schedule schedule) {
this.schedule = schedule;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
if (this.poller != null) {
this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll);
}
}
public final boolean isRunning() {
return this.running;
}
@Override
protected void initialize() throws Exception {
synchronized (this.lifecycleMonitor) {
if (this.inputChannel instanceof PollableChannel && this.poller == null) {
this.poller = new ChannelPoller((PollableChannel) this.inputChannel, this.schedule);
this.poller.setMaxMessagesPerPoll(this.maxMessagesPerPoll);
this.configureTransactionSettingsForPoller(this.poller);
this.poller.subscribe(this);
}
this.initialized = true;
}
}
public final void start() {
synchronized (this.lifecycleMonitor) {
if (this.running) {
return;
}
if (!this.initialized) {
this.afterPropertiesSet();
}
if (this.inputChannel == null) {
throw new ConfigurationException("failed to start endpoint, inputChannel is required");
}
if (this.inputChannel instanceof SubscribableSource) {
((SubscribableSource) inputChannel).subscribe(this);
}
else if (this.inputChannel instanceof PollableChannel) {
if (this.getTaskScheduler() == null) {
throw new ConfigurationException("failed to start endpoint, no taskScheduler available");
}
this.getTaskScheduler().schedule(poller);
}
this.running = true;
}
}
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (!this.running) {
return;
}
if (this.inputChannel instanceof SubscribableSource) {
((SubscribableSource) inputChannel).unsubscribe(this);
}
else if (this.poller != null) {
this.getTaskScheduler().cancel(poller, true);
}
this.running = false;
}
}
public final void onMessage(Message<?> message) {
if (message == null || message.getPayload() == null) {
throw new IllegalArgumentException("Message and its payload must not be null");
}
if (this.logger.isDebugEnabled()) {
this.logger.debug("endpoint '" + this + "' processing message: " + message);
}
try {
this.processMessage(message);
}
catch (Exception e) {
if (e instanceof MessagingException) {
this.handleException((MessagingException) e);
}
else {
this.handleException(new MessageHandlingException(message,
"failure occurred in endpoint '" + this.toString() + "'", e));
}
}
}
protected abstract void processMessage(Message<?> message);
}

View File

@@ -18,11 +18,11 @@ package org.springframework.integration.endpoint;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.SchedulableTask;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
@@ -30,7 +30,7 @@ import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
public abstract class AbstractPoller implements SubscribableSource, SchedulableTask, InitializingBean {
public abstract class AbstractPoller implements SchedulableTask, InitializingBean {
public static final int MAX_MESSAGES_UNBOUNDED = -1;
@@ -45,11 +45,11 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT
private volatile TransactionTemplate transactionTemplate;
private volatile String propagationBehaviorName = "PROPAGATION_REQUIRED";
private volatile int propagationBehavior = DefaultTransactionDefinition.PROPAGATION_REQUIRED;
private volatile String isolationLevelName = "ISOLATION_DEFAULT";
private volatile int isolationLevel = DefaultTransactionDefinition.ISOLATION_DEFAULT;
private volatile int transactionTimeout = -1;
private volatile int transactionTimeout = DefaultTransactionDefinition.TIMEOUT_DEFAULT;
private volatile boolean readOnly = false;
@@ -94,12 +94,12 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT
this.transactionManager = transactionManager;
}
public void setPropagationBehaviorName(String propagationBehaviorName) {
this.propagationBehaviorName = propagationBehaviorName;
public void setPropagationBehavior(int propagationBehavior) {
this.propagationBehavior = propagationBehavior;
}
public void setIsolationLevelName(String isolationLevelName) {
this.isolationLevelName = isolationLevelName;
public void setIsolationLevel(int isolationLevel) {
this.isolationLevel = isolationLevel;
}
public void setTransactionTimeout(int transactionTimeout) {
@@ -124,8 +124,8 @@ public abstract class AbstractPoller implements SubscribableSource, SchedulableT
}
if (this.transactionManager != null) {
TransactionTemplate template = new TransactionTemplate(this.transactionManager);
template.setPropagationBehaviorName(this.propagationBehaviorName);
template.setIsolationLevelName(this.isolationLevelName);
template.setPropagationBehavior(this.propagationBehavior);
template.setIsolationLevel(this.isolationLevel);
template.setTimeout(this.transactionTimeout);
template.setReadOnly(this.readOnly);
this.transactionTemplate = template;

View File

@@ -19,6 +19,7 @@ package org.springframework.integration.endpoint;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.dispatcher.SimpleDispatcher;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.util.Assert;
@@ -50,12 +51,12 @@ public class ChannelPoller extends AbstractPoller implements SubscribableSource
this.receiveTimeout = receiveTimeout;
}
public boolean subscribe(MessageEndpoint endpoint) {
return this.dispatcher.subscribe(endpoint);
public boolean subscribe(MessageConsumer consumer) {
return this.dispatcher.subscribe(consumer);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.dispatcher.unsubscribe(endpoint);
public boolean unsubscribe(MessageConsumer consumer) {
return this.dispatcher.unsubscribe(consumer);
}
@Override

View File

@@ -16,11 +16,12 @@
package org.springframework.integration.endpoint;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryAware;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.MethodInvokingSource;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.integration.scheduling.TaskScheduler;
/**
* A Channel Adapter implementation for connecting a
@@ -29,33 +30,75 @@ import org.springframework.integration.message.MessagingException;
*
* @author Mark Fisher
*/
public class InboundChannelAdapter extends AbstractEndpoint {
public class InboundChannelAdapter extends AbstractEndpoint implements Lifecycle {
private MessageChannel channel;
private volatile PollableSource<?> source;
private volatile MessageChannel channel;
private volatile Schedule schedule;
private volatile SourcePoller poller;
private volatile int maxMessagesPerPoll = -1;
private volatile boolean running;
private final Object lifecycleMonitor = new Object();
public void setSource(PollableSource<?> source) {
this.source = source;
}
public void setChannel(MessageChannel channel) {
this.channel = channel;
}
@Override
protected boolean sendInternal(Message<?> message) {
if (this.channel == null) {
throw new MessageDeliveryException(message, "no channel has been provided");
public void setSchedule(Schedule schedule) {
this.schedule = schedule;
}
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
if (this.poller != null) {
this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll);
}
try {
boolean sent = this.getMessageExchangeTemplate().send(message, this.channel);
if (sent && this.getSource() instanceof MessageDeliveryAware) {
((MessageDeliveryAware) this.getSource()).onSend(message);
}
public final boolean isRunning() {
return this.running;
}
public final void start() {
synchronized (this.lifecycleMonitor) {
if (this.running) {
return;
}
this.poller = new SourcePoller(source, channel, schedule);
if (maxMessagesPerPoll < 0 && source instanceof MethodInvokingSource) {
// the default is 1 since a MethodInvokingSource might return a non-null value
// every time it is invoked, thus producing an infinite number of messages per poll
maxMessagesPerPoll = 1;
}
this.configureTransactionSettingsForPoller(this.poller);
this.poller.setMaxMessagesPerPoll(maxMessagesPerPoll);
TaskScheduler taskScheduler = this.getTaskScheduler();
if (taskScheduler != null) {
taskScheduler.schedule(this.poller);
}
return sent;
}
catch (Exception e) {
if (this.getSource() instanceof MessageDeliveryAware) {
((MessageDeliveryAware) this.getSource()).onFailure(message, e);
}
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (!this.running) {
return;
}
TaskScheduler taskScheduler = this.getTaskScheduler();
if (taskScheduler != null) {
taskScheduler.cancel(this.poller, true);
}
throw (e instanceof MessagingException) ? (MessagingException) e
: new MessageDeliveryException(message, "channel adapter failed to send message to target", e);
}
}

View File

@@ -16,8 +16,6 @@
package org.springframework.integration.endpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
/**
* Base interface for message endpoints.
@@ -28,8 +26,4 @@ public interface MessageEndpoint {
String getName();
MessageSource<?> getSource();
boolean send(Message<?> message);
}

View File

@@ -18,6 +18,7 @@ package org.springframework.integration.endpoint;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.util.Assert;
@@ -27,7 +28,7 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
public class OutboundChannelAdapter extends AbstractEndpoint {
public class OutboundChannelAdapter extends AbstractMessageConsumingEndpoint {
private final MessageTarget target;
@@ -39,8 +40,10 @@ public class OutboundChannelAdapter extends AbstractEndpoint {
@Override
protected boolean sendInternal(Message<?> message) {
return this.target.send(message);
protected void processMessage(Message<?> message) {
if (!this.target.send(message)) {
throw new MessageDeliveryException(message, "failed to deliver Message to target");
}
}
}

View File

@@ -50,6 +50,7 @@ public class ServiceActivatorEndpoint extends AbstractInOutEndpoint {
@Override
protected void initialize() throws Exception {
super.initialize();
if (this.invoker instanceof InitializingBean) {
((InitializingBean) this.invoker).afterPropertiesSet();
}

View File

@@ -16,30 +16,34 @@
package org.springframework.integration.endpoint;
import org.springframework.integration.dispatcher.SimpleDispatcher;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.message.BlockingSource;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryAware;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
public class SourcePoller extends AbstractPoller implements SubscribableSource {
public class SourcePoller extends AbstractPoller {
private final PollableSource<?> source;
private final SimpleDispatcher dispatcher = new SimpleDispatcher();
private final MessageChannel channel;
private volatile long receiveTimeout = 1000;
public SourcePoller(PollableSource<?> source, Schedule schedule) {
public SourcePoller(PollableSource<?> source, MessageChannel channel, Schedule schedule) {
super(schedule);
Assert.notNull(source, "source must not be null");
Assert.notNull(channel, "channel must not be null");
this.source = source;
this.channel = channel;
}
@@ -54,14 +58,6 @@ public class SourcePoller extends AbstractPoller implements SubscribableSource {
this.receiveTimeout = receiveTimeout;
}
public boolean subscribe(MessageEndpoint endpoint) {
return this.dispatcher.subscribe(endpoint);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.dispatcher.unsubscribe(endpoint);
}
@Override
protected boolean doPoll() {
Message<?> message = (this.receiveTimeout >= 0 && this.source instanceof BlockingSource)
@@ -70,7 +66,20 @@ public class SourcePoller extends AbstractPoller implements SubscribableSource {
if (message == null) {
return false;
}
return this.dispatcher.dispatch(message);
try {
boolean sent = this.channel.send(message);
if (sent && this.source instanceof MessageDeliveryAware) {
((MessageDeliveryAware) this.source).onSend(message);
}
return sent;
}
catch (Exception e) {
if (this.source instanceof MessageDeliveryAware) {
((MessageDeliveryAware) this.source).onFailure(message, e);
}
throw (e instanceof MessagingException) ? (MessagingException) e
: new MessageDeliveryException(message, "source poller failed to send message to channel", e);
}
}
}

View File

@@ -205,7 +205,7 @@ public class SimpleMessagingGateway extends MessagingGatewaySupport implements M
}
ReplyMessageCorrelator correlator = new ReplyMessageCorrelator(this.replyMapCapacity);
correlator.setBeanName("internal.correlator." + this);
correlator.setSource(this.replyChannel);
correlator.setInputChannel(this.replyChannel);
correlator.afterPropertiesSet();
this.endpointRegistry.registerEndpoint(correlator);
this.replyMessageCorrelator = correlator;

View File

@@ -16,8 +16,6 @@
package org.springframework.integration.message;
import org.springframework.integration.endpoint.MessageEndpoint;
/**
* Interface for any source of messages that accepts subscribers.
*
@@ -26,13 +24,13 @@ import org.springframework.integration.endpoint.MessageEndpoint;
public interface SubscribableSource extends MessageSource {
/**
* Register a {@link MessageEndpoint} as a subscriber to this source.
* Register a {@link MessageConsumer} as a subscriber to this source.
*/
boolean subscribe(MessageEndpoint endpoint);
boolean subscribe(MessageConsumer consumer);
/**
* Remove a {@link MessageEndpoint} from the subscribers of this source.
* Remove a {@link MessageConsumer} from the subscribers of this source.
*/
boolean unsubscribe(MessageEndpoint endpoint);
boolean unsubscribe(MessageConsumer consumer);
}

View File

@@ -21,7 +21,7 @@ import java.util.Collection;
import org.springframework.integration.channel.ChannelRegistry;
import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageExchangeTemplate;
@@ -30,7 +30,7 @@ import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
public class RouterEndpoint extends AbstractEndpoint {
public class RouterEndpoint extends AbstractMessageConsumingEndpoint {
private final ChannelResolver channelResolver;
@@ -78,7 +78,7 @@ public class RouterEndpoint extends AbstractEndpoint {
}
@Override
protected boolean sendInternal(Message<?> message) {
protected void processMessage(Message<?> message) {
boolean sent = false;
Collection<MessageChannel> results = this.channelResolver.resolveChannels(message);
if (results != null) {
@@ -99,7 +99,6 @@ public class RouterEndpoint extends AbstractEndpoint {
"no target resolved by router and no default output channel defined");
}
}
return sent;
}
}

View File

@@ -34,7 +34,6 @@ import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.InboundChannelAdapter;
import org.springframework.integration.endpoint.SourcePoller;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
@@ -66,7 +65,7 @@ public class DefaultMessageBusTests {
}
};
endpoint.setBeanName("testEndpoint");
endpoint.setSource(sourceChannel);
endpoint.setInputChannel(sourceChannel);
bus.registerEndpoint(endpoint);
bus.start();
Message<?> result = targetChannel.receive(3000);
@@ -126,10 +125,10 @@ public class DefaultMessageBusTests {
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setInputChannel(inputChannel);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setInputChannel(inputChannel);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
@@ -169,10 +168,10 @@ public class DefaultMessageBusTests {
bus.registerChannel(outputChannel1);
bus.registerChannel(outputChannel2);
endpoint1.setBeanName("testEndpoint1");
endpoint1.setSource(inputChannel);
endpoint1.setInputChannel(inputChannel);
endpoint1.setOutputChannel(outputChannel1);
endpoint2.setBeanName("testEndpoint2");
endpoint2.setSource(inputChannel);
endpoint2.setInputChannel(inputChannel);
endpoint2.setOutputChannel(outputChannel2);
bus.registerEndpoint(endpoint1);
bus.registerEndpoint(endpoint2);
@@ -191,18 +190,21 @@ public class DefaultMessageBusTests {
public void testErrorChannelWithFailedDispatch() throws InterruptedException {
MessageBus bus = new DefaultMessageBus();
QueueChannel errorChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
errorChannel.setBeanName("errorChannel");
bus.registerChannel(errorChannel);
CountDownLatch latch = new CountDownLatch(1);
InboundChannelAdapter channelAdapter = new InboundChannelAdapter();
SourcePoller poller = new SourcePoller(new FailingSource(latch), new PollingSchedule(1000));
channelAdapter.setSource(poller);
channelAdapter.setSource(new FailingSource(latch));
channelAdapter.setSchedule(new PollingSchedule(1000));
channelAdapter.setChannel(outputChannel);
channelAdapter.setBeanName("testChannel");
bus.registerEndpoint(channelAdapter);
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = errorChannel.receive(5000);
bus.stop();
assertNull(outputChannel.receive(0));
assertNotNull("message should not be null", message);
assertTrue(message instanceof ErrorMessage);
Throwable exception = ((ErrorMessage) message).getPayload();
@@ -237,7 +239,7 @@ public class DefaultMessageBusTests {
}
};
endpoint.setBeanName("testEndpoint");
endpoint.setSource(errorChannel);
endpoint.setInputChannel(errorChannel);
bus.registerEndpoint(endpoint);
bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));

View File

@@ -61,7 +61,7 @@ public class DirectChannelSubscriptionTests {
public void testSendAndReceiveForRegisteredEndpoint() {
MethodInvoker invoker = new MessageMappingMethodInvoker(new TestBean(), "handle");
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(invoker);
endpoint.setSource(sourceChannel);
endpoint.setInputChannel(sourceChannel);
endpoint.setOutputChannel(targetChannel);
endpoint.setBeanName("testEndpoint");
bus.registerEndpoint(endpoint);
@@ -95,7 +95,7 @@ public class DirectChannelSubscriptionTests {
throw new RuntimeException("intentional test failure");
}
};
endpoint.setSource(sourceChannel);
endpoint.setInputChannel(sourceChannel);
endpoint.setOutputChannel(targetChannel);
endpoint.setBeanName("testEndpoint");
bus.registerEndpoint(endpoint);

View File

@@ -12,7 +12,7 @@
<bean id="endpoint" class="org.springframework.integration.endpoint.ServiceActivatorEndpoint">
<constructor-arg ref="handler"/>
<property name="source" ref="sourceChannel"/>
<property name="inputChannel" ref="sourceChannel"/>
<property name="outputChannel" ref="targetChannel"/>
</bean>

View File

@@ -24,10 +24,8 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.StringMessage;
/**
@@ -62,7 +60,7 @@ public class DirectChannelTests {
}
private static class ThreadNameExtractingTestTarget implements MessageEndpoint {
private static class ThreadNameExtractingTestTarget implements MessageConsumer {
private String threadName;
@@ -77,21 +75,11 @@ public class DirectChannelTests {
this.latch = latch;
}
public boolean send(Message<?> message) {
public void onMessage(Message<?> message) {
this.threadName = Thread.currentThread().getName();
if (this.latch != null) {
this.latch.countDown();
}
return true;
}
// TODO: remove once this is a consumer instead of endpoint
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
}

View File

@@ -19,8 +19,8 @@ package org.springframework.integration.channel.config;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.SubscribableSource;
/**
@@ -28,20 +28,20 @@ import org.springframework.integration.message.SubscribableSource;
*/
public class TestSubscribableSource implements SubscribableSource {
private final List<MessageEndpoint> endpoints = new CopyOnWriteArrayList<MessageEndpoint>();
private final List<MessageConsumer> subscibers = new CopyOnWriteArrayList<MessageConsumer>();
public boolean subscribe(MessageEndpoint endpoint) {
return this.endpoints.add(endpoint);
public boolean subscribe(MessageConsumer subsciber) {
return this.subscibers.add(subsciber);
}
public boolean unsubscribe(MessageEndpoint endpoint) {
return this.endpoints.remove(endpoint);
public boolean unsubscribe(MessageConsumer subsciber) {
return this.subscibers.remove(subsciber);
}
public void publishMessage(Message<?> message) {
for (MessageEndpoint endpoint : this.endpoints) {
endpoint.send(message);
for (MessageConsumer subsciber : this.subscibers) {
subsciber.onMessage(message);
}
}

View File

@@ -61,7 +61,7 @@ public class AggregatorParserTests {
outboundMessages.add(createMessage("789", "id1", 3, 3, null));
outboundMessages.add(createMessage("456", "id1", 3, 2, null));
for (Message<?> message : outboundMessages) {
endpoint.send(message);
endpoint.onMessage(message);
}
Assert.assertEquals("One and only one message must have been aggregated", 1, aggregatorBean
.getAggregatedMessages().size());
@@ -111,7 +111,7 @@ public class AggregatorParserTests {
outboundMessages.add(createMessage(2l, "id1", 3, 3, null));
outboundMessages.add(createMessage(3l, "id1", 3, 2, null));
for (Message<?> message : outboundMessages) {
addingAggregator.send(message);
addingAggregator.onMessage(message);
}
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> response = outputChannel.receive();
@@ -140,13 +140,13 @@ public class AggregatorParserTests {
MethodInvoker invoker = (MethodInvoker) completionStrategyAccessor.getPropertyValue("invoker");
Assert.assertTrue(new DirectFieldAccessor(invoker).getPropertyValue("object") instanceof MaxValueCompletionStrategy);
Assert.assertTrue(((Method)completionStrategyAccessor.getPropertyValue("method")).getName().equals("checkCompleteness"));
aggregatorWithPojoCompletionStrategy.send(createMessage(1l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(2l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.send(createMessage(3l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(1l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(2l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(3l, "id1", 0 , 0, null));
PollableChannel outputChannel = (PollableChannel) context.getBean("outputChannel");
Message<?> reply = outputChannel.receive(0);
Assert.assertNull(reply);
aggregatorWithPojoCompletionStrategy.send(createMessage(5l, "id1", 0 , 0, null));
aggregatorWithPojoCompletionStrategy.onMessage(createMessage(5l, "id1", 0 , 0, null));
reply = outputChannel.receive(0);
Assert.assertNotNull(reply);
Assert.assertEquals(11l, reply.getPayload());

View File

@@ -19,7 +19,6 @@ package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.TimeUnit;
@@ -28,10 +27,10 @@ import org.junit.Test;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.StringMessage;
@@ -57,11 +56,11 @@ public class EndpointParserTests {
public void testEndpointWithSelectorAccepts() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithSelector.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint");
MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint");
QueueChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
assertTrue(endpoint.send(message));
endpoint.onMessage(message);
Message<?> reply = replyChannel.receive(500);
assertNotNull(reply);
assertEquals("foo", reply.getPayload());
@@ -71,11 +70,11 @@ public class EndpointParserTests {
public void testEndpointWithSelectorRejects() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"endpointWithSelector.xml", this.getClass());
MessageEndpoint endpoint = (MessageEndpoint) context.getBean("endpoint");
MessageConsumer endpoint = (MessageConsumer) context.getBean("endpoint");
MessageChannel replyChannel = new QueueChannel();
Message<?> message = MessageBuilder.fromPayload(123)
.setReturnAddress(replyChannel).build();
endpoint.send(message);
endpoint.onMessage(message);
}
@Test

View File

@@ -44,11 +44,11 @@ import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.SubscribableSource;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
import org.springframework.integration.util.MethodInvoker;
@@ -295,10 +295,10 @@ public class MessagingAnnotationPostProcessorTests {
AnnotatedEndpointWithPolledAnnotation endpoint = new AnnotatedEndpointWithPolledAnnotation();
postProcessor.postProcessAfterInitialization(endpoint, "testBean");
ServiceActivatorEndpoint processedEndpoint = (ServiceActivatorEndpoint) messageBus.lookupEndpoint("testBean.serviceActivator");
processedEndpoint.afterPropertiesSet();
DirectFieldAccessor accessor = new DirectFieldAccessor(processedEndpoint);
MessageSource<?> source = (MessageSource<?>) accessor.getPropertyValue("source");
assertTrue(source instanceof SubscribableSource);
Schedule schedule = (Schedule) new DirectFieldAccessor(source).getPropertyValue("schedule");
ChannelPoller poller = (ChannelPoller) accessor.getPropertyValue("poller");
Schedule schedule = (Schedule) new DirectFieldAccessor(poller).getPropertyValue("schedule");
assertEquals(PollingSchedule.class, schedule.getClass());
PollingSchedule pollingSchedule = (PollingSchedule) schedule;
assertEquals(1234, pollingSchedule.getPeriod());
@@ -318,17 +318,10 @@ public class MessagingAnnotationPostProcessorTests {
DirectChannel testChannel = (DirectChannel) messageBus.lookupChannel("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message<?>> receivedMessage = new AtomicReference<Message<?>>();
testChannel.subscribe(new org.springframework.integration.endpoint.MessageEndpoint() {
public boolean send(Message<?> message) {
testChannel.subscribe(new MessageConsumer() {
public void onMessage(Message<?> message) {
receivedMessage.set(message);
latch.countDown();
return false;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
});
latch.await(3, TimeUnit.SECONDS);

View File

@@ -17,7 +17,6 @@
package org.springframework.integration.dispatcher;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.getCurrentArguments;
import static org.easymock.EasyMock.isA;
@@ -35,9 +34,8 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.StringMessage;
/**
@@ -52,11 +50,11 @@ public class BroadcastingDispatcherTests {
private Message<?> messageMock = createMock(Message.class);
private MessageEndpoint targetMock1 = createMock(MessageEndpoint.class);
private MessageConsumer targetMock1 = createMock(MessageConsumer.class);
private MessageEndpoint targetMock2 = createMock(MessageEndpoint.class);
private MessageConsumer targetMock2 = createMock(MessageConsumer.class);
private MessageEndpoint targetMock3 = createMock(MessageEndpoint.class);
private MessageConsumer targetMock3 = createMock(MessageConsumer.class);
private Object[] globalMocks = new Object[] {
messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3 };
@@ -75,7 +73,8 @@ public class BroadcastingDispatcherTests {
public void singleTargetWithoutTaskExecutor() throws Exception {
dispatcher.setTaskExecutor(null);
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -84,7 +83,8 @@ public class BroadcastingDispatcherTests {
@Test
public void singleTargetWithTaskExecutor() throws Exception {
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -96,9 +96,12 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -109,9 +112,12 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -124,8 +130,10 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(false, true, true);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
targetMock2.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -138,8 +146,10 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(true, false, true);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -152,8 +162,10 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
partialFailingExecutorMock(true, true, false);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock2.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock2.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -176,7 +188,8 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock1);
expect(targetMock1.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -188,8 +201,10 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
dispatcher.unsubscribe(targetMock2);
expect(targetMock1.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true);
targetMock1.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
dispatcher.dispatch(messageMock);
verify(globalMocks);
@@ -200,9 +215,12 @@ public class BroadcastingDispatcherTests {
dispatcher.subscribe(targetMock1);
dispatcher.subscribe(targetMock2);
dispatcher.subscribe(targetMock3);
expect(targetMock1.send(messageMock)).andReturn(true).times(2);
expect(targetMock2.send(messageMock)).andReturn(true);
expect(targetMock3.send(messageMock)).andReturn(true).times(2);
targetMock1.onMessage(messageMock);
expectLastCall().times(2);
targetMock2.onMessage(messageMock);
expectLastCall();
targetMock3.onMessage(messageMock);
expectLastCall().times(2);
replay(globalMocks);
dispatcher.dispatch(messageMock);
dispatcher.unsubscribe(targetMock2);
@@ -214,8 +232,8 @@ public class BroadcastingDispatcherTests {
public void applySequenceDisabledByDefault() {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageEndpoint target1 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target2 = new MessageStoringTestEndpoint(messages);
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.dispatch(new StringMessage("test"));
@@ -231,9 +249,9 @@ public class BroadcastingDispatcherTests {
BroadcastingDispatcher dispatcher = new BroadcastingDispatcher();
dispatcher.setApplySequence(true);
final List<Message<?>> messages = Collections.synchronizedList(new ArrayList<Message<?>>());
MessageEndpoint target1 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target2 = new MessageStoringTestEndpoint(messages);
MessageEndpoint target3 = new MessageStoringTestEndpoint(messages);
MessageConsumer target1 = new MessageStoringTestEndpoint(messages);
MessageConsumer target2 = new MessageStoringTestEndpoint(messages);
MessageConsumer target3 = new MessageStoringTestEndpoint(messages);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -276,7 +294,7 @@ public class BroadcastingDispatcherTests {
}
private static class MessageStoringTestEndpoint implements MessageEndpoint {
private static class MessageStoringTestEndpoint implements MessageConsumer {
private final List<Message<?>> messageList;
@@ -284,17 +302,8 @@ public class BroadcastingDispatcherTests {
this.messageList = messageList;
}
public boolean send(Message<?> message) {
public void onMessage(Message<?> message) {
this.messageList.add(message);
return true;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
}
};

View File

@@ -27,13 +27,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.endpoint.AbstractInOutEndpoint;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.ServiceActivatorEndpoint;
import org.springframework.integration.handler.TestHandlers;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.message.selector.MessageSelector;
@@ -70,10 +69,15 @@ public class SimpleDispatcherTests {
public void noDuplicateSubscriptions() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target = new CountingTestEndpoint(counter, false);
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.subscribe(target);
dispatcher.dispatch(new StringMessage("test"));
try {
dispatcher.dispatch(new StringMessage("test"));
}
catch (Exception e) {
// ignore
}
assertEquals("target should not have duplicate subscriptions", 1, counter.get());
}
@@ -81,14 +85,19 @@ public class SimpleDispatcherTests {
public void unsubscribeBeforeSend() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.unsubscribe(target2);
dispatcher.dispatch(new StringMessage("test"));
try {
dispatcher.dispatch(new StringMessage("test"));
}
catch (Exception e) {
// ignore
}
assertEquals(2, counter.get());
}
@@ -96,19 +105,34 @@ public class SimpleDispatcherTests {
public void unsubscribeBetweenSends() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
dispatcher.dispatch(new StringMessage("test1"));
try {
dispatcher.dispatch(new StringMessage("test1"));
}
catch (Exception e) {
// ignore
}
assertEquals(3, counter.get());
dispatcher.unsubscribe(target2);
dispatcher.dispatch(new StringMessage("test2"));
try {
dispatcher.dispatch(new StringMessage("test2"));
}
catch (Exception e) {
// ignore
}
assertEquals(5, counter.get());
dispatcher.unsubscribe(target1);
dispatcher.dispatch(new StringMessage("test3"));
try {
dispatcher.dispatch(new StringMessage("test3"));
}
catch (Exception e) {
// ignore
}
assertEquals(6, counter.get());
}
@@ -116,9 +140,14 @@ public class SimpleDispatcherTests {
public void unsubscribeLastTargetCausesDeliveryException() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target = new CountingTestEndpoint(counter, false);
MessageConsumer target = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target);
dispatcher.dispatch(new StringMessage("test1"));
try {
dispatcher.dispatch(new StringMessage("test1"));
}
catch (Exception e) {
// ignore
}
assertEquals(1, counter.get());
dispatcher.unsubscribe(target);
dispatcher.dispatch(new StringMessage("test2"));
@@ -184,9 +213,9 @@ public class SimpleDispatcherTests {
public void firstHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target1 = new CountingTestEndpoint(counter, true);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
MessageConsumer target1 = new CountingTestEndpoint(counter, true);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -198,9 +227,9 @@ public class SimpleDispatcherTests {
public void middleHandlerReturnsTrue() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, true);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, true);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
@@ -212,13 +241,18 @@ public class SimpleDispatcherTests {
public void allHandlersReturnFalse() {
SimpleDispatcher dispatcher = new SimpleDispatcher();
final AtomicInteger counter = new AtomicInteger();
MessageEndpoint target1 = new CountingTestEndpoint(counter, false);
MessageEndpoint target2 = new CountingTestEndpoint(counter, false);
MessageEndpoint target3 = new CountingTestEndpoint(counter, false);
MessageConsumer target1 = new CountingTestEndpoint(counter, false);
MessageConsumer target2 = new CountingTestEndpoint(counter, false);
MessageConsumer target3 = new CountingTestEndpoint(counter, false);
dispatcher.subscribe(target1);
dispatcher.subscribe(target2);
dispatcher.subscribe(target3);
assertFalse(dispatcher.dispatch(new StringMessage("test")));
try {
assertFalse(dispatcher.dispatch(new StringMessage("test")));
}
catch (Exception e) {
// ignore
}
assertEquals("each target should have been invoked", 3, counter.get());
}
@@ -246,28 +280,22 @@ public class SimpleDispatcherTests {
}
private static class CountingTestEndpoint implements MessageEndpoint {
private static class CountingTestEndpoint implements MessageConsumer {
private final AtomicInteger counter;
private final boolean returnValue;
private final boolean shouldAccept;
CountingTestEndpoint(AtomicInteger counter, boolean returnValue) {
CountingTestEndpoint(AtomicInteger counter, boolean shouldAccept) {
this.counter = counter;
this.returnValue = returnValue;
this.shouldAccept = shouldAccept;
}
public boolean send(Message<?> message) {
public void onMessage(Message<?> message) {
this.counter.incrementAndGet();
return this.returnValue;
}
public String getName() {
return null;
}
public MessageSource<?> getSource() {
return null;
if (!this.shouldAccept) {
throw new MessageRejectedException(message, "intentional test failure");
}
}
}

View File

@@ -18,6 +18,7 @@ package org.springframework.integration.endpoint;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
import static org.easymock.EasyMock.verify;
@@ -26,9 +27,9 @@ import org.junit.Before;
import org.junit.Test;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.endpoint.ChannelPoller;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessageRejectedException;
import org.springframework.integration.scheduling.Schedule;
/**
@@ -40,7 +41,7 @@ public class ChannelPollerTests {
private ChannelPoller poller;
private Schedule scheduleMock = createMock(Schedule.class);
private PollableChannel channelMock = createMock(PollableChannel.class);
private MessageEndpoint endpointMock = createMock(MessageEndpoint.class);
private MessageConsumer endpointMock = createMock(MessageConsumer.class);
private Message messageMock = createMock(Message.class);
private Object[] globalMocks = new Object[] { scheduleMock, channelMock, endpointMock, messageMock };
@@ -57,7 +58,8 @@ public class ChannelPollerTests {
@Test
public void singleMessage() {
expect(channelMock.receive()).andReturn(messageMock);
expect(endpointMock.send(messageMock)).andReturn(true);
endpointMock.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
poller.setMaxMessagesPerPoll(1);
poller.run();
@@ -67,7 +69,8 @@ public class ChannelPollerTests {
@Test
public void multipleMessages() {
expect(channelMock.receive()).andReturn(messageMock).times(5);
expect(endpointMock.send(messageMock)).andReturn(true).times(5);
endpointMock.onMessage(messageMock);
expectLastCall().times(5);
replay(globalMocks);
poller.setMaxMessagesPerPoll(5);
poller.run();
@@ -78,26 +81,29 @@ public class ChannelPollerTests {
public void multipleMessages_underrun() {
expect(channelMock.receive()).andReturn(messageMock).times(5);
expect(channelMock.receive()).andReturn(null);
expect(endpointMock.send(messageMock)).andReturn(true).times(5);
endpointMock.onMessage(messageMock);
expectLastCall().times(5);
replay(globalMocks);
poller.setMaxMessagesPerPoll(6);
poller.run();
verify(globalMocks);
}
@Test
public void droppedMessage() {
@Test(expected = MessageRejectedException.class)
public void rejectedMessage() {
expect(channelMock.receive()).andReturn(messageMock);
expect(endpointMock.send(messageMock)).andReturn(false);
endpointMock.onMessage(messageMock);
expectLastCall().andThrow(new MessageRejectedException(messageMock, "intentional test failure"));
replay(globalMocks);
poller.run();
verify(globalMocks);
}
@Test
@Test(expected = MessageRejectedException.class)
public void droppedMessage_onePerPoll() {
expect(channelMock.receive()).andReturn(messageMock).times(1);
expect(endpointMock.send(messageMock)).andReturn(false).anyTimes();
endpointMock.onMessage(messageMock);
expectLastCall().andThrow(new MessageRejectedException(messageMock, "intentional test failure")).anyTimes();
replay(globalMocks);
poller.setMaxMessagesPerPoll(10);
poller.run();
@@ -121,9 +127,11 @@ public class ChannelPollerTests {
poller = new ChannelPoller(channelMock, scheduleMock);
poller.subscribe(endpointMock);
expect(channelMock.receive(1)).andReturn(messageMock);
expect(endpointMock.send(messageMock)).andReturn(false);
endpointMock.onMessage(messageMock);
expectLastCall();
replay(globalMocks);
poller.setReceiveTimeout(1);
poller.setMaxMessagesPerPoll(1);
poller.run();
verify(globalMocks);
}

View File

@@ -1,64 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.endpoint;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.integration.bus.DefaultMessageBus;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.message.PollableSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.PollingSchedule;
/**
* @author Mark Fisher
*/
public class MessagingBridgeTests {
@Test
public void simplePassThrough() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
DefaultMessageBus bus = new DefaultMessageBus();
MessagingBridge bridge = new MessagingBridge(new MessageTarget() {
public boolean send(Message<?> message) {
latch.countDown();
return true;
}
});
bridge.setBeanName("bridge");
PollableSource<String> source = new PollableSource<String>() {
public Message<String> receive() {
return new StringMessage("test");
}
};
SourcePoller poller = new SourcePoller(source, new PollingSchedule(1000));
poller.setMaxMessagesPerPoll(1);
bridge.setSource(poller);
bus.registerEndpoint(bridge);
bus.start();
latch.await(1, TimeUnit.SECONDS);
bus.stop();
assertEquals(0, latch.getCount());
}
}

View File

@@ -54,7 +54,7 @@ public class ServiceActivatorEndpointTests {
ServiceActivatorEndpoint endpoint = this.createEndpoint();
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -67,7 +67,7 @@ public class ServiceActivatorEndpointTests {
ServiceActivatorEndpoint endpoint = this.createEndpoint();
endpoint.setOutputChannel(channel1);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel2).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply1 = channel1.receive(0);
assertNotNull(reply1);
assertEquals("FOO", reply1.getPayload());
@@ -80,7 +80,7 @@ public class ServiceActivatorEndpointTests {
QueueChannel channel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -95,7 +95,7 @@ public class ServiceActivatorEndpointTests {
ServiceActivatorEndpoint endpoint = this.createEndpoint();
endpoint.setChannelRegistry(channelRegistry);
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress("testChannel").build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -118,7 +118,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setChannelRegistry(channelRegistry);
Message<String> testMessage1 = MessageBuilder.fromPayload("bar")
.setReturnAddress(replyChannel1).build();
endpoint.send(testMessage1);
endpoint.onMessage(testMessage1);
Message<?> reply1 = replyChannel1.receive(50);
assertNotNull(reply1);
assertEquals("foobar", reply1.getPayload());
@@ -126,7 +126,7 @@ public class ServiceActivatorEndpointTests {
assertNull(reply2);
Message<String> testMessage2 = MessageBuilder.fromMessage(testMessage1)
.setReturnAddress("replyChannel2").build();
endpoint.send(testMessage2);
endpoint.onMessage(testMessage2);
reply1 = replyChannel1.receive(0);
assertNull(reply1);
reply2 = replyChannel2.receive(0);
@@ -139,7 +139,7 @@ public class ServiceActivatorEndpointTests {
QueueChannel channel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.fromPayload("foo").setReturnAddress(channel).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = channel.receive(0);
assertNotNull(reply);
assertEquals("FOO", reply.getPayload());
@@ -149,7 +149,7 @@ public class ServiceActivatorEndpointTests {
public void noReplyTarget() {
ServiceActivatorEndpoint endpoint = this.createEndpoint();
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
endpoint.onMessage(message);
}
@Test
@@ -159,7 +159,7 @@ public class ServiceActivatorEndpointTests {
new TestNullReplyBean(), "handle");
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
endpoint.onMessage(message);
assertNull(channel.receive(0));
}
@@ -171,7 +171,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setRequiresReply(true);
endpoint.setOutputChannel(channel);
Message<?> message = MessageBuilder.fromPayload("foo").build();
endpoint.send(message);
endpoint.onMessage(message);
}
@Test(expected=MessageRejectedException.class)
@@ -183,7 +183,7 @@ public class ServiceActivatorEndpointTests {
return false;
}
});
endpoint.send(new StringMessage("test"));
endpoint.onMessage(new StringMessage("test"));
}
@Test
@@ -196,7 +196,7 @@ public class ServiceActivatorEndpointTests {
return true;
}
});
endpoint.send(new StringMessage("test"));
endpoint.onMessage(new StringMessage("test"));
latch.await(100, TimeUnit.MILLISECONDS);
assertEquals("handler should have been invoked", 0, latch.getCount());
}
@@ -222,7 +222,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setSelector(selectorChain);
boolean exceptionWasThrown = false;
try {
endpoint.send(new StringMessage("test"));
endpoint.onMessage(new StringMessage("test"));
}
catch (MessageRejectedException e) {
exceptionWasThrown = true;
@@ -253,7 +253,7 @@ public class ServiceActivatorEndpointTests {
endpoint.setSelector(selectorChain);
boolean exceptionWasThrown = false;
try {
endpoint.send(new StringMessage("test"));
endpoint.onMessage(new StringMessage("test"));
}
catch (MessageRejectedException e) {
exceptionWasThrown = true;
@@ -282,7 +282,7 @@ public class ServiceActivatorEndpointTests {
}
});
endpoint.setSelector(selectorChain);
assertTrue(endpoint.send(new StringMessage("test")));
endpoint.onMessage(new StringMessage("test"));
assertEquals("both selectors and handler should have been invoked", 3, counter.get());
}
@@ -297,7 +297,7 @@ public class ServiceActivatorEndpointTests {
}, "handle");
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = replyChannel.receive(500);
assertNull(reply.getHeaders().getCorrelationId());
}
@@ -313,7 +313,7 @@ public class ServiceActivatorEndpointTests {
}, "handle");
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = replyChannel.receive(500);
assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId());
}
@@ -330,7 +330,7 @@ public class ServiceActivatorEndpointTests {
}, "handle");
Message<String> message = MessageBuilder.fromPayload("test")
.setReturnAddress(replyChannel).build();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply = replyChannel.receive(500);
Object correlationId = reply.getHeaders().getCorrelationId();
assertFalse(message.getHeaders().getId().equals(correlationId));

View File

@@ -44,9 +44,9 @@ public class CorrelationIdTests {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setInputChannel(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(correlationId, reply.getHeaders().getCorrelationId());
@@ -58,9 +58,9 @@ public class CorrelationIdTests {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setInputChannel(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(message.getHeaders().getId(), reply.getHeaders().getCorrelationId());
@@ -73,9 +73,9 @@ public class CorrelationIdTests {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "upperCase");
endpoint.setSource(inputChannel);
endpoint.setInputChannel(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals(message.getHeaders().getCorrelationId(), reply.getHeaders().getCorrelationId());
@@ -90,9 +90,9 @@ public class CorrelationIdTests {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
endpoint.setSource(inputChannel);
endpoint.setInputChannel(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals("456-XYZ", reply.getHeaders().getCorrelationId());
@@ -104,9 +104,9 @@ public class CorrelationIdTests {
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel(1);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(new TestBean(), "createMessage");
endpoint.setSource(inputChannel);
endpoint.setInputChannel(inputChannel);
endpoint.setOutputChannel(outputChannel);
endpoint.afterPropertiesSet();
endpoint.start();
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
assertEquals("456-XYZ", reply.getHeaders().getCorrelationId());
@@ -121,7 +121,7 @@ public class CorrelationIdTests {
SplitterEndpoint endpoint = new SplitterEndpoint(splitter);
endpoint.setOutputChannel(testChannel);
splitter.afterPropertiesSet();
endpoint.send(message);
endpoint.onMessage(message);
Message<?> reply1 = testChannel.receive(100);
Message<?> reply2 = testChannel.receive(100);
assertEquals(message.getHeaders().getId(), reply1.getHeaders().getCorrelationId());

View File

@@ -64,9 +64,9 @@ public class MessageFilterTests {
return true;
}
});
filter.setSource(inputChannel);
filter.setInputChannel(inputChannel);
filter.setOutputChannel(outputChannel);
filter.afterPropertiesSet();
filter.start();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));
Message<?> reply = outputChannel.receive(0);
@@ -83,9 +83,9 @@ public class MessageFilterTests {
return false;
}
});
filter.setSource(inputChannel);
filter.setInputChannel(inputChannel);
filter.setOutputChannel(outputChannel);
filter.afterPropertiesSet();
filter.start();
Message<?> message = new StringMessage("test");
assertTrue(inputChannel.send(message));
assertNull(outputChannel.receive(0));

View File

@@ -91,7 +91,7 @@ public class MethodInvokingTargetTests {
bus.registerChannel(channel);
ServiceActivatorEndpoint endpoint = new ServiceActivatorEndpoint(target);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(channel);
endpoint.setInputChannel(channel);
bus.registerEndpoint(endpoint);
bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);

View File

@@ -53,7 +53,7 @@ public class MessageExchangeTemplateTests {
MessageBus bus = new DefaultMessageBus();
bus.registerChannel(requestChannel);
endpoint.setBeanName("testEndpoint");
endpoint.setSource(requestChannel);
endpoint.setInputChannel(requestChannel);
bus.registerEndpoint(endpoint);
bus.start();
}

View File

@@ -17,10 +17,8 @@
package org.springframework.integration.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -57,7 +55,7 @@ public class MethodInvokingRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new GenericMessage<String>("bar");
assertTrue(endpoint.send(message));
endpoint.onMessage(message);
Message<?> replyMessage = barChannel.receive();
assertNotNull(replyMessage);
assertEquals(message, replyMessage);
@@ -74,7 +72,7 @@ public class MethodInvokingRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new GenericMessage<String>("bar");
assertTrue(endpoint.send(message));
endpoint.onMessage(message);
Message<?> replyMessage = barChannel.receive();
assertNotNull(replyMessage);
assertEquals(message, replyMessage);
@@ -96,7 +94,7 @@ public class MethodInvokingRouterTests {
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = MessageBuilder.fromPayload("bar")
.setHeader("targetChannel", "foo").build();
assertTrue(endpoint.send(message));
endpoint.onMessage(message);
Message<?> fooReply = fooChannel.receive(0);
Message<?> barReply = barChannel.receive(0);
assertNotNull(fooReply);
@@ -110,7 +108,7 @@ public class MethodInvokingRouterTests {
Method routingMethod = testBean.getClass().getMethod("routeByHeader", String.class);
MethodInvokingChannelResolver resolver = new MethodInvokingChannelResolver(testBean, routingMethod);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.send(new GenericMessage<String>("testing"));
endpoint.onMessage(new GenericMessage<String>("testing"));
}
@Test
@@ -142,15 +140,15 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -183,15 +181,15 @@ public class MethodInvokingRouterTests {
channelRegistry.registerChannel(fooChannel);
channelRegistry.registerChannel(barChannel);
endpoint.setChannelRegistry(channelRegistry);
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -224,15 +222,15 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1 = fooChannel.receive(0);
assertNotNull(result1);
assertEquals("foo", result1.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2 = barChannel.receive(0);
assertNotNull(result2);
assertEquals("bar", result2.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -265,21 +263,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -312,21 +310,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -359,21 +357,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -406,21 +404,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -453,21 +451,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}
@Test
@@ -500,21 +498,21 @@ public class MethodInvokingRouterTests {
Message<String> fooMessage = new StringMessage("foo");
Message<String> barMessage = new StringMessage("bar");
Message<String> badMessage = new StringMessage("bad");
assertTrue(endpoint.send(fooMessage));
endpoint.onMessage(fooMessage);
Message<?> result1a = fooChannel.receive(0);
Message<?> result1b = barChannel.receive(0);
assertNotNull(result1a);
assertEquals("foo", result1a.getPayload());
assertNotNull(result1b);
assertEquals("foo", result1b.getPayload());
assertTrue(endpoint.send(barMessage));
endpoint.onMessage(barMessage);
Message<?> result2a = fooChannel.receive(0);
Message<?> result2b = barChannel.receive(0);
assertNotNull(result2a);
assertEquals("bar", result2a.getPayload());
assertNotNull(result2b);
assertEquals("bar", result2b.getPayload());
assertFalse(endpoint.send(badMessage));
endpoint.onMessage(badMessage);
}

View File

@@ -51,7 +51,7 @@ public class MultiChannelRouterTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -77,7 +77,7 @@ public class MultiChannelRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -97,7 +97,7 @@ public class MultiChannelRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
@Test(expected = MessagingException.class)
@@ -109,7 +109,7 @@ public class MultiChannelRouterTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
}

View File

@@ -71,8 +71,8 @@ public class PayloadTypeRouterTests {
endpoint.setChannelRegistry(channelRegistry);
Message<String> message1 = new StringMessage("test");
Message<Integer> message2 = new GenericMessage<Integer>(123);
endpoint.send(message1);
endpoint.send(message2);
endpoint.onMessage(message1);
endpoint.onMessage(message2);
Message<?> reply1 = stringChannel.receive(0);
Message<?> reply2 = integerChannel.receive(0);
assertEquals("test", reply1.getPayload());
@@ -96,8 +96,8 @@ public class PayloadTypeRouterTests {
endpoint.setDefaultOutputChannel(defaultChannel);
Message<String> message1 = new StringMessage("test");
Message<Integer> message2 = new GenericMessage<Integer>(123);
endpoint.send(message1);
endpoint.send(message2);
endpoint.onMessage(message1);
endpoint.onMessage(message2);
Message<?> result1 = stringChannel.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());

View File

@@ -69,7 +69,7 @@ public class RecipientListRouterTests {
resolver.afterPropertiesSet();
RouterEndpoint endpoint = new RouterEndpoint(resolver);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -93,7 +93,7 @@ public class RecipientListRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());
@@ -117,7 +117,7 @@ public class RecipientListRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result1 = channel1.receive(25);
assertNotNull(result1);
assertEquals("test", result1.getPayload());

View File

@@ -65,7 +65,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));
@@ -87,7 +87,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(runtimeExceptionChannel.receive(1000));
assertNull(illegalArgumentChannel.receive(0));
assertNull(defaultChannel.receive(0));
@@ -108,7 +108,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(messageHandlingExceptionChannel.receive(1000));
assertNull(runtimeExceptionChannel.receive(0));
assertNull(illegalArgumentChannel.receive(0));
@@ -125,7 +125,7 @@ public class RootCauseErrorMessageRouterTests {
RootCauseErrorMessageChannelResolver resolver = new RootCauseErrorMessageChannelResolver();
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(defaultChannel.receive(1000));
assertNull(runtimeExceptionChannel.receive(0));
assertNull(illegalArgumentChannel.receive(0));
@@ -146,7 +146,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setResolutionRequired(true);
endpoint.send(message);
endpoint.onMessage(message);
}
@Test
@@ -165,7 +165,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));
@@ -187,7 +187,7 @@ public class RootCauseErrorMessageRouterTests {
resolver.setChannelMappings(channelMappings);
RouterEndpoint endpoint = new RouterEndpoint(resolver);
endpoint.setDefaultOutputChannel(defaultChannel);
endpoint.send(message);
endpoint.onMessage(message);
assertNotNull(illegalArgumentChannel.receive(1000));
assertNull(defaultChannel.receive(0));
assertNull(runtimeExceptionChannel.receive(0));

View File

@@ -16,8 +16,6 @@
package org.springframework.integration.router;
import static org.junit.Assert.assertFalse;
import java.util.Collections;
import java.util.List;
@@ -45,7 +43,7 @@ public class RouterEndpointTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
Message<String> message = new StringMessage("test");
assertFalse(endpoint.send(message));
endpoint.onMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -58,7 +56,7 @@ public class RouterEndpointTests {
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
endpoint.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
@Test
@@ -70,7 +68,7 @@ public class RouterEndpointTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
Message<String> message = new StringMessage("test");
assertFalse(endpoint.send(message));
endpoint.onMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -83,7 +81,7 @@ public class RouterEndpointTests {
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
endpoint.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
@Test
@@ -97,7 +95,7 @@ public class RouterEndpointTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
assertFalse(endpoint.send(message));
endpoint.onMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -112,7 +110,7 @@ public class RouterEndpointTests {
endpoint.setChannelRegistry(channelRegistry);
endpoint.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
@@ -127,7 +125,7 @@ public class RouterEndpointTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
assertFalse(endpoint.send(message));
endpoint.onMessage(message);
}
@Test(expected = MessageDeliveryException.class)
@@ -142,7 +140,7 @@ public class RouterEndpointTests {
endpoint.setChannelRegistry(channelRegistry);
endpoint.setResolutionRequired(true);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
@Test(expected = MessagingException.class)
@@ -153,7 +151,7 @@ public class RouterEndpointTests {
}
};
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.send(new StringMessage("this should fail"));
endpoint.onMessage(new StringMessage("this should fail"));
}
@Test(expected = MessagingException.class)
@@ -164,7 +162,7 @@ public class RouterEndpointTests {
}
};
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.send(new StringMessage("this should fail"));
endpoint.onMessage(new StringMessage("this should fail"));
}
@Test(expected = IllegalArgumentException.class)

View File

@@ -17,7 +17,6 @@
package org.springframework.integration.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
@@ -45,7 +44,7 @@ public class SingleChannelRouterTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result = channel.receive(25);
assertNotNull(result);
assertEquals("test", result.getPayload());
@@ -65,14 +64,14 @@ public class SingleChannelRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
Message<?> result = channel.receive(25);
assertNotNull(result);
assertEquals("test", result.getPayload());
}
@Test
public void nullChannelResult() {
public void nullChannelResultIgnored() {
AbstractSingleChannelResolver channelResolver = new AbstractSingleChannelResolver() {
public MessageChannel resolveChannel(Message<?> message) {
return null;
@@ -80,7 +79,7 @@ public class SingleChannelRouterTests {
};
RouterEndpoint endpoint = new RouterEndpoint(channelResolver);
Message<String> message = new StringMessage("test");
assertFalse(endpoint.send(message));
endpoint.onMessage(message);
}
@Test(expected = MessagingException.class)
@@ -94,7 +93,7 @@ public class SingleChannelRouterTests {
RouterEndpoint endpoint = new RouterEndpoint(channelNameResolver);
endpoint.setChannelRegistry(channelRegistry);
Message<String> message = new StringMessage("test");
endpoint.send(message);
endpoint.onMessage(message);
}
}