diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java
index ce93d232e3..41ba925e86 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
@@ -47,18 +48,19 @@ import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.factory.ChannelFactory;
import org.springframework.integration.channel.factory.QueueChannelFactory;
-import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.DefaultEndpointRegistry;
import org.springframework.integration.endpoint.EndpointRegistry;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.MessagingGateway;
+import org.springframework.integration.endpoint.MessageProducingEndpoint;
import org.springframework.integration.endpoint.SourceEndpoint;
+import org.springframework.integration.endpoint.MessageConsumingEndpoint;
import org.springframework.integration.endpoint.TargetEndpoint;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.message.CommandMessage;
-import org.springframework.integration.message.PollCommand;
import org.springframework.integration.message.MessageTarget;
+import org.springframework.integration.message.PollCommand;
import org.springframework.integration.scheduling.MessagePublishingErrorHandler;
import org.springframework.integration.scheduling.MessagingTask;
import org.springframework.integration.scheduling.MessagingTaskScheduler;
@@ -99,8 +101,6 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
private volatile ScheduledExecutorService executor;
- private volatile ConcurrencyPolicy defaultConcurrencyPolicy;
-
private volatile boolean configureAsyncEventMulticaster = false;
private volatile boolean autoCreateChannels = false;
@@ -144,14 +144,6 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
this.executor = executor;
}
- /**
- * Specify the default concurrency policy to be used for any endpoint that
- * is registered without an explicitly provided policy of its own.
- */
- public void setDefaultConcurrencyPolicy(ConcurrencyPolicy defaultConcurrencyPolicy) {
- this.defaultConcurrencyPolicy = defaultConcurrencyPolicy;
- }
-
/**
* Set whether to automatically start the bus after initialization.
*
Default is 'true'; set this to 'false' to allow for manual startup
@@ -220,7 +212,6 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
if (this.getErrorChannel() == null) {
this.setErrorChannel(new DefaultErrorChannel());
}
- this.taskScheduler.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel()));
this.initialized = true;
this.initializing = false;
}
@@ -263,32 +254,20 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
}
public void registerHandler(String name, MessageHandler handler, Subscription subscription) {
- this.registerHandler(name, handler, subscription, this.defaultConcurrencyPolicy);
- }
-
- public void registerHandler(String name, MessageHandler handler, Subscription subscription,
- ConcurrencyPolicy concurrencyPolicy) {
Assert.notNull(handler, "'handler' must not be null");
HandlerEndpoint endpoint = new HandlerEndpoint(handler);
- this.doRegisterEndpoint(name, endpoint, subscription, concurrencyPolicy);
+ this.doRegisterEndpoint(name, endpoint, subscription);
}
public void registerTarget(String name, MessageTarget target, Subscription subscription) {
- this.registerTarget(name, target, subscription, this.defaultConcurrencyPolicy);
- }
-
- public void registerTarget(String name, MessageTarget target, Subscription subscription,
- ConcurrencyPolicy concurrencyPolicy) {
Assert.notNull(target, "'target' must not be null");
TargetEndpoint endpoint = new TargetEndpoint(target);
- this.doRegisterEndpoint(name, endpoint, subscription, concurrencyPolicy);
+ this.doRegisterEndpoint(name, endpoint, subscription);
}
- private void doRegisterEndpoint(String name, TargetEndpoint endpoint, Subscription subscription,
- ConcurrencyPolicy concurrencyPolicy) {
+ private void doRegisterEndpoint(String name, TargetEndpoint endpoint, Subscription subscription) {
endpoint.setName(name);
endpoint.setSubscription(subscription);
- endpoint.setConcurrencyPolicy(concurrencyPolicy);
this.registerEndpoint(name, endpoint);
}
@@ -299,10 +278,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
if (endpoint instanceof ChannelRegistryAware) {
((ChannelRegistryAware) endpoint).setChannelRegistry(this.channelRegistry);
}
- if (endpoint instanceof TargetEndpoint) {
- this.registerTargetEndpoint((TargetEndpoint) endpoint);
- }
- else if (endpoint instanceof SourceEndpoint) {
+ if (endpoint instanceof SourceEndpoint) {
this.registerSourceEndpoint(name, (SourceEndpoint) endpoint);
}
this.endpointRegistry.registerEndpoint(name, endpoint);
@@ -314,12 +290,6 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
}
}
- private void registerTargetEndpoint(TargetEndpoint endpoint) {
- if (endpoint.getConcurrencyPolicy() == null && this.defaultConcurrencyPolicy != null) {
- endpoint.setConcurrencyPolicy(this.defaultConcurrencyPolicy);
- }
- }
-
public MessageEndpoint unregisterEndpoint(String name) {
MessageEndpoint endpoint = this.endpointRegistry.unregisterEndpoint(name);
if (endpoint == null) {
@@ -357,22 +327,41 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
}
private void activateEndpoint(MessageEndpoint endpoint) {
- if (endpoint instanceof TargetEndpoint) {
- this.activateTargetEndpoint((TargetEndpoint) endpoint);
+ if (endpoint instanceof MessageProducingEndpoint) {
+ String channelName = ((MessageProducingEndpoint) endpoint).getOutputChannelName();
+ if (channelName != null && this.lookupChannel(channelName) == null) {
+ if (!this.autoCreateChannels) {
+ throw new ConfigurationException("Unknown channel '" + channelName
+ + "' configured as output channel for endpoint '" + endpoint
+ + "'. Consider enabling the 'autoCreateChannels' option for the message bus.");
+ }
+ this.registerChannel(channelName, new QueueChannel());
+ }
+ }
+ if (endpoint instanceof InitializingBean) {
+ try {
+ ((InitializingBean) endpoint).afterPropertiesSet();
+ }
+ catch (Exception e) {
+ throw new ConfigurationException("failed to initialize endpoint", e);
+ }
+ }
+ if (endpoint instanceof MessageConsumingEndpoint && endpoint instanceof MessageTarget) {
+ this.activateSubscriber((MessageConsumingEndpoint) endpoint);
}
}
- private void activateTargetEndpoint(TargetEndpoint endpoint) {
- Subscription subscription = endpoint.getSubscription();
+ private void activateSubscriber(MessageConsumingEndpoint subscriber) {
+ Subscription subscription = subscriber.getSubscription();
if (subscription == null) {
- throw new ConfigurationException("Unable to register endpoint '" + endpoint
+ throw new ConfigurationException("Unable to register endpoint '" + subscriber
+ "'. No subscription information is available.");
}
MessageChannel channel = subscription.getChannel();
if (channel == null) {
String channelName = subscription.getChannelName();
if (channelName == null) {
- throw new ConfigurationException("endpoint '" + endpoint
+ throw new ConfigurationException("endpoint '" + subscriber
+ "' must provide either 'channel' or 'channelName' in its subscription metadata");
}
channel = this.lookupChannel(channelName);
@@ -388,27 +377,10 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
this.registerChannel(channelName, channel);
}
}
- if (endpoint instanceof HandlerEndpoint) {
- HandlerEndpoint handlerEndpoint = (HandlerEndpoint) endpoint;
- String outputChannelName = handlerEndpoint.getOutputChannelName();
- if (outputChannelName != null && this.lookupChannel(outputChannelName) == null) {
- if (!this.autoCreateChannels) {
- throw new ConfigurationException("Unknown channel '" + outputChannelName
- + "' configured as output channel for endpoint '" + endpoint
- + "'. Consider enabling the 'autoCreateChannels' option for the message bus.");
- }
- this.registerChannel(outputChannelName, new QueueChannel());
- }
- }
- if (!endpoint.hasErrorHandler() && this.getErrorChannel() != null && !this.getErrorChannel().equals(channel)) {
- endpoint.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel()));
- }
- endpoint.afterPropertiesSet();
- this.activateSubscription(channel, endpoint, subscription.getSchedule());
+ this.activateSubscription(channel, (MessageTarget) subscriber, subscription.getSchedule());
if (logger.isInfoEnabled()) {
- logger
- .info("activated subscription to channel '" + channel.getName() + "' for endpoint '" + endpoint
- + "'");
+ logger.info("activated subscription to channel '" + channel.getName()
+ + "' for endpoint '" + subscriber + "' of type '" + subscriber.getClass() + "'");
}
}
@@ -447,7 +419,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
}
}
- private void activateSubscription(MessageChannel channel, TargetEndpoint targetEndpoint, Schedule schedule) {
+ private void activateSubscription(MessageChannel channel, MessageTarget target, Schedule schedule) {
SubscriptionManager manager = this.subscriptionManagers.get(channel);
if (manager == null) {
if (logger.isWarnEnabled()) {
@@ -456,7 +428,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
}
return;
}
- manager.addTarget(targetEndpoint, schedule);
+ manager.addTarget(target, schedule);
if (this.isRunning() && !manager.isRunning()) {
manager.start();
}
@@ -479,6 +451,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio
this.starting = true;
synchronized (this.lifecycleMonitor) {
this.activateEndpoints();
+ this.taskScheduler.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel()));
this.taskScheduler.start();
for (SubscriptionManager manager : this.subscriptionManagers.values()) {
manager.start();
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java
index 724b2d511c..064b1fddcd 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusAwareBeanPostProcessor.java
@@ -44,6 +44,9 @@ public class MessageBusAwareBeanPostProcessor implements BeanPostProcessor {
}
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof MessageBusAware) {
+ ((MessageBusAware) bean).setMessageBus(messageBus);
+ }
return bean;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java
index 5b45eec68a..c8fcb6cb41 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/SubscriptionManager.java
@@ -30,13 +30,10 @@ import org.springframework.integration.ConfigurationException;
import org.springframework.integration.channel.MessageChannel;
import org.springframework.integration.dispatcher.DirectChannel;
import org.springframework.integration.dispatcher.PollingDispatcherTask;
-import org.springframework.integration.endpoint.TargetEndpoint;
-import org.springframework.integration.message.MessagingException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.scheduling.MessagingTaskScheduler;
import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.integration.scheduling.Schedule;
-import org.springframework.integration.util.ErrorHandler;
import org.springframework.util.Assert;
/**
@@ -88,7 +85,7 @@ public class SubscriptionManager {
}
else if (this.channel instanceof DirectChannel) {
if (logger.isInfoEnabled()) {
- logger.info("Subscribing to a SynchronousChannel. The provided schedule will be ignored.");
+ logger.info("Subscribing to a DirectChannel. The provided schedule will be ignored.");
}
}
else if (this.channel.getDispatcherPolicy().isPublishSubscribe()) {
@@ -107,16 +104,6 @@ public class SubscriptionManager {
}
if (this.channel instanceof DirectChannel) {
((DirectChannel) this.channel).subscribe(target);
- if (target instanceof TargetEndpoint) {
- ((TargetEndpoint) target).setErrorHandler(new ErrorHandler() {
- public void handle(Throwable t) {
- if (t instanceof MessagingException) {
- throw (MessagingException) t;
- }
- throw new MessagingException("error occurred in handler", t);
- }
- });
- }
return;
}
PollingDispatcherTask dispatcherTask = this.dispatcherTasks.get(schedule);
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
index be5369f9ef..14a03a5a7e 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
@@ -32,7 +32,9 @@ import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
+import org.springframework.integration.ConfigurationException;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
+import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor;
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
import org.springframework.transaction.interceptor.NoRollbackRuleAttribute;
import org.springframework.transaction.interceptor.RollbackRuleAttribute;
@@ -102,6 +104,10 @@ public abstract class IntegrationNamespaceUtils {
String txInterceptorBeanName = parseTransactionInterceptor(childElement, parserContext);
interceptors.add(new RuntimeBeanReference(txInterceptorBeanName));
}
+ else if ("concurrency-interceptor".equals(localName)) {
+ String concurrencyInterceptorBeanName = parseConcurrencyInterceptor(childElement, parserContext);
+ interceptors.add(new RuntimeBeanReference(concurrencyInterceptorBeanName));
+ }
}
}
return interceptors;
@@ -161,6 +167,24 @@ public abstract class IntegrationNamespaceUtils {
return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
}
+ private static String parseConcurrencyInterceptor(Element element, ParserContext parserContext) {
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ConcurrencyInterceptor.class);
+ String taskExecutorRef = element.getAttribute("task-executor");
+ if (StringUtils.hasText(taskExecutorRef)) {
+ if (element.getAttributes().getLength() != 1) {
+ parserContext.getReaderContext().error("No other attributes are permitted when "
+ + "specifying a 'task-executor' reference on the element.",
+ parserContext.extractSource(element));
+ }
+ builder.addConstructorArgReference(taskExecutorRef);
+ }
+ else {
+ ConcurrencyPolicy policy = IntegrationNamespaceUtils.parseConcurrencyPolicy(element);
+ builder.addConstructorArgValue(policy);
+ }
+ return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
+ }
+
/**
* Populates the property identified by propertyName on the bean definition
* to the value of the attribute specified by attributeName, if that
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java
index 26afeb68a8..7b8761b689 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java
@@ -46,7 +46,7 @@ public class MessageEndpointBeanPostProcessor implements BeanPostProcessor {
if (interceptors.size() > 0) {
ProxyFactory proxyFactory = new ProxyFactory(endpoint);
for (Advice interceptor : interceptors) {
- proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(interceptor));
+ proxyFactory.addAdvisor(new EndpointMethodAdvisor(interceptor));
}
bean = proxyFactory.getProxy();
}
@@ -56,16 +56,16 @@ public class MessageEndpointBeanPostProcessor implements BeanPostProcessor {
@SuppressWarnings("serial")
- private static class EndpointInvokeMethodAdvisor extends StaticMethodMatcherPointcutAdvisor {
+ private static class EndpointMethodAdvisor extends StaticMethodMatcherPointcutAdvisor {
- EndpointInvokeMethodAdvisor(Advice advice) {
+ EndpointMethodAdvisor(Advice advice) {
super(advice);
}
@SuppressWarnings("unchecked")
public boolean matches(Method method, Class clazz) {
- return method.getName().equals("invoke")
+ return (method.getName().equals("invoke") || method.getName().equals("send"))
&& method.getParameterTypes().length == 1
&& method.getParameterTypes()[0].equals(Message.class);
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java
index 2e1d8e893e..47e9218237 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/HandlerAnnotationPostProcessor.java
@@ -41,6 +41,7 @@ import org.springframework.integration.channel.ChannelRegistryAware;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.endpoint.MessageEndpoint;
+import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.MessageHandlerChain;
import org.springframework.integration.handler.config.DefaultMessageHandlerCreator;
@@ -111,6 +112,7 @@ public class HandlerAnnotationPostProcessor extends AbstractAnnotationMethodPost
return handler;
}
+ @SuppressWarnings("unchecked")
protected MessageHandler processResults(List results) {
MessageHandlerChain handlerChain = new MessageHandlerChain();
for (MessageHandler handler : results) {
@@ -146,7 +148,7 @@ public class HandlerAnnotationPostProcessor extends AbstractAnnotationMethodPost
concurrencyAnnotation.coreSize(), concurrencyAnnotation.maxSize());
concurrencyPolicy.setKeepAliveSeconds(concurrencyAnnotation.keepAliveSeconds());
concurrencyPolicy.setQueueCapacity(concurrencyAnnotation.queueCapacity());
- endpoint.setConcurrencyPolicy(concurrencyPolicy);
+ endpoint.addInterceptor(new ConcurrencyInterceptor(concurrencyPolicy));
}
return endpoint;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TargetAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TargetAnnotationPostProcessor.java
index 633aa8e1b6..4fdfa5079a 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TargetAnnotationPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/TargetAnnotationPostProcessor.java
@@ -28,6 +28,7 @@ import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.TargetEndpoint;
+import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor;
import org.springframework.integration.handler.MethodInvokingTarget;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.scheduling.Subscription;
@@ -70,7 +71,7 @@ public class TargetAnnotationPostProcessor extends AbstractAnnotationMethodPostP
concurrencyAnnotation.maxSize());
concurrencyPolicy.setKeepAliveSeconds(concurrencyAnnotation.keepAliveSeconds());
concurrencyPolicy.setQueueCapacity(concurrencyAnnotation.queueCapacity());
- endpoint.setConcurrencyPolicy(concurrencyPolicy);
+ endpoint.addInterceptor(new ConcurrencyInterceptor(concurrencyPolicy));
}
return endpoint;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
index 670f50110f..b6bb72f416 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
@@ -25,7 +25,6 @@
-
@@ -304,12 +303,13 @@
-
+
- Defines a concurrency policy.
+ Defines a ConcurrencyInterceptor for endpoints.
+
@@ -415,11 +415,9 @@
-
-
@@ -453,6 +451,7 @@
+
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
index d4f48c7b33..8d94a5dec3 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
@@ -59,18 +59,23 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware
return (this.name != null) ? this.name : super.toString();
}
+ public void addInterceptor(Object interceptor) {
+ if (interceptor instanceof Advice) {
+ this.interceptors.add((Advice) interceptor);
+ }
+ else if (interceptor instanceof EndpointInterceptor) {
+ this.interceptors.add(new EndpointMethodInterceptor((EndpointInterceptor) interceptor));
+ }
+ else {
+ throw new ConfigurationException("Interceptor must implement either "
+ + "'" + Advice.class.getName() + "' or '" + EndpointInterceptor.class.getName() + "'.");
+ }
+ }
+
public void setInterceptors(List