diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/filecopy/fileCopyDemo-common.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/filecopy/fileCopyDemo-common.xml
index f7aaf1918c..1d83a97169 100644
--- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/filecopy/fileCopyDemo-common.xml
+++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/filecopy/fileCopyDemo-common.xml
@@ -16,7 +16,7 @@
-
+
diff --git a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/oddeven/oddEvenDemo.xml b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/oddeven/oddEvenDemo.xml
index 434ce922eb..c2cb4efa16 100644
--- a/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/oddeven/oddEvenDemo.xml
+++ b/org.springframework.integration.samples/src/main/java/org/springframework/integration/samples/oddeven/oddEvenDemo.xml
@@ -16,7 +16,7 @@
-
+
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java
index 0c421ae3c6..e7bea07ad5 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/DefaultMessageBus.java
@@ -344,6 +344,11 @@ public class DefaultMessageBus implements MessageBus, ApplicationContextAware, A
}
if (source != null && source instanceof SubscribableSource) {
((SubscribableSource) source).subscribe(endpoint);
+ if (source instanceof PollingDispatcher) {
+ PollingDispatcher poller = (PollingDispatcher) source;
+ this.pollingDispatchers.add(poller);
+ this.taskScheduler.schedule(poller);
+ }
if (logger.isInfoEnabled()) {
logger.info("activated subscription to channel '"
+ source + "' for endpoint '" + endpoint + "'");
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractHandlerEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractHandlerEndpointParser.java
index 143eb61f98..77dd9c19eb 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractHandlerEndpointParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractHandlerEndpointParser.java
@@ -17,8 +17,6 @@
package org.springframework.integration.config;
import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.config.RuntimeBeanReference;
@@ -31,9 +29,8 @@ import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.endpoint.HandlerEndpoint;
import org.springframework.integration.handler.MessageHandler;
-import org.springframework.integration.scheduling.PollingSchedule;
-import org.springframework.integration.scheduling.Schedule;
import org.springframework.util.StringUtils;
+import org.springframework.util.xml.DomUtils;
/**
* Base class parser for elements that create handler-invoking endpoints.
@@ -52,10 +49,6 @@ public abstract class AbstractHandlerEndpointParser extends AbstractSingleBeanDe
protected static final String RETURN_ADDRESS_OVERRIDES_ATTRIBUTE = "return-address-overrides";
- private static final String PERIOD_ATTRIBUTE = "period";
-
- private static final String SCHEDULE_ELEMENT = "schedule";
-
private static final String POLLER_ELEMENT = "poller";
private static final String SELECTOR_ATTRIBUTE = "selector";
@@ -96,32 +89,24 @@ public abstract class AbstractHandlerEndpointParser extends AbstractSingleBeanDe
else {
builder.addConstructorArgReference(ref);
}
- Schedule schedule = null;
- NodeList childNodes = element.getChildNodes();
- for (int i = 0; i < childNodes.getLength(); i++) {
- Node child = childNodes.item(i);
- if (child.getNodeType() == Node.ELEMENT_NODE) {
- Element childElement = (Element) child;
- String localName = child.getLocalName();
- if (SCHEDULE_ELEMENT.equals(localName)) {
- schedule = this.parseSchedule(childElement);
- }
- else if (POLLER_ELEMENT.equals(localName)) {
- builder.addPropertyReference("messageExchangeTemplate",
- IntegrationNamespaceUtils.parsePoller(childElement, parserContext));
- }
- else if (INTERCEPTORS_ELEMENT.equals(localName)) {
- EndpointInterceptorParser parser = new EndpointInterceptorParser();
- ManagedList interceptors = parser.parseInterceptors(childElement, parserContext);
- builder.addPropertyValue("interceptors", interceptors);
- }
- }
+ String inputChannel = element.getAttribute(INPUT_CHANNEL_ATTRIBUTE);
+ if (!StringUtils.hasText(inputChannel)) {
+ throw new ConfigurationException("the '" + INPUT_CHANNEL_ATTRIBUTE + "' attribute is required");
}
- if (schedule != null) {
- builder.addPropertyValue("schedule", schedule);
+ Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT);
+ if (pollerElement != null) {
+ String pollerBeanName = IntegrationNamespaceUtils.parsePoller(inputChannel, pollerElement, parserContext);
+ builder.addPropertyReference("source", pollerBeanName);
+ }
+ else {
+ builder.addPropertyReference("source", inputChannel);
+ }
+ Element interceptorsElement = DomUtils.getChildElementByTagName(element, INTERCEPTORS_ELEMENT);
+ if (interceptorsElement != null) {
+ EndpointInterceptorParser parser = new EndpointInterceptorParser();
+ ManagedList interceptors = parser.parseInterceptors(interceptorsElement, parserContext);
+ builder.addPropertyValue("interceptors", interceptors);
}
- IntegrationNamespaceUtils.setValueIfAttributeDefined(
- builder, element, INPUT_CHANNEL_ATTRIBUTE, "inputChannelName");
IntegrationNamespaceUtils.setValueIfAttributeDefined(
builder, element, OUTPUT_CHANNEL_ATTRIBUTE, "outputChannelName");
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, SELECTOR_ATTRIBUTE);
@@ -151,13 +136,4 @@ public abstract class AbstractHandlerEndpointParser extends AbstractSingleBeanDe
return adapterBeanName;
}
- private Schedule parseSchedule(Element scheduleElement) {
- PollingSchedule schedule = new PollingSchedule(0);
- String period = scheduleElement.getAttribute(PERIOD_ATTRIBUTE);
- if (StringUtils.hasText(period)) {
- schedule.setPeriod(Integer.parseInt(period));
- }
- return schedule;
- }
-
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
index 8d811167e6..51bbaefc70 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java
@@ -25,8 +25,11 @@ import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.Conventions;
+import org.springframework.integration.dispatcher.PollingDispatcher;
+import org.springframework.integration.dispatcher.SimpleDispatcher;
import org.springframework.integration.message.AsyncMessageExchangeTemplate;
import org.springframework.integration.message.MessageExchangeTemplate;
+import org.springframework.integration.scheduling.PollingSchedule;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.StringUtils;
import org.springframework.util.xml.DomUtils;
@@ -135,26 +138,30 @@ public abstract class IntegrationNamespaceUtils {
/**
* Parse a "poller" element and return the bean name of the poller instance.
- * The name will be generated for a newly created bean, or if the poller
- * element simply provides a "ref", that value will be returned.
*
+ * @param sourceBeanName the name of the PollableSource bean
* @param element the "poller" element to parse
* @param parserContext the parserContext for registering a newly created bean definition
* @return the name of the poller bean definition
*/
- public static String parsePoller(Element element, ParserContext parserContext) {
- String ref = element.getAttribute("ref");
+ public static String parsePoller(String sourceBeanName, Element element, ParserContext parserContext) {
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(PollingDispatcher.class);
+ Long period = Long.valueOf(element.getAttribute("period"));
+ PollingSchedule schedule = new PollingSchedule(period);
+ String templateBeanName = parseMessageExhangeTemplate(element, parserContext);
+ builder.addConstructorArgReference(sourceBeanName);
+ builder.addConstructorArgValue(schedule);
+ builder.addConstructorArgValue(new SimpleDispatcher());
+ builder.addConstructorArgReference(templateBeanName);
+ setValueIfAttributeDefined(builder, element, "receive-timeout");
+ setValueIfAttributeDefined(builder, element, "send-timeout");
+ setValueIfAttributeDefined(builder, element, "max-messages-per-poll");
+ return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
+ }
+
+ private static String parseMessageExhangeTemplate(Element element, ParserContext parserContext) {
String taskExecutorRef = element.getAttribute("task-executor");
Element txElement = DomUtils.getChildElementByTagName(element, "transactional");
- if (StringUtils.hasText(ref)) {
- if (StringUtils.hasText(taskExecutorRef) || (txElement != null)) {
- parserContext.getReaderContext().error(
- "Neither the 'task-executor' attribute or 'transactional' sub-element "
- + "should be provided when using the 'ref' attribute.",
- parserContext.extractSource(element));
- }
- return ref;
- }
Class> beanClass = (StringUtils.hasText(taskExecutorRef)) ?
AsyncMessageExchangeTemplate.class : MessageExchangeTemplate.class;
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(beanClass);
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
index babfdfa896..eaa5772f8b 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd
@@ -207,7 +207,6 @@
-
@@ -219,17 +218,6 @@
-
-
-
-
- Defines a schedule.
-
-
-
-
-
-
@@ -240,10 +228,11 @@
-
-
+
+
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml
index 876cfa3af1..0b867c3086 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml
@@ -17,7 +17,7 @@
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
-
+
@@ -28,7 +28,7 @@
input-channel="testChannel"
ref="testHandler"
output-channel="replyChannel">
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml
index 7db153c6cd..4f269ddb4c 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithHandlerChainElement.xml
@@ -20,7 +20,7 @@
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml
index 4f5c4d78f4..8bcdc28bab 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointWithSelector.xml
@@ -13,7 +13,7 @@
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml
index 10b9613034..2059ed410a 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/handlerAdapterEndpointTests.xml
@@ -12,7 +12,7 @@
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/simpleEndpointTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/simpleEndpointTests.xml
index 827d49c7ec..ba32d5fe0f 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/simpleEndpointTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/simpleEndpointTests.xml
@@ -12,7 +12,7 @@
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorPropagationTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorPropagationTests.xml
index 10434ba1c6..221b577eec 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorPropagationTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorPropagationTests.xml
@@ -17,7 +17,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+
@@ -28,7 +28,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+
@@ -39,7 +39,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+
@@ -50,7 +50,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+
@@ -61,7 +61,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorTests.xml
index 56be721c69..58b2b6716c 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/interceptor/transactionInterceptorTests.xml
@@ -17,7 +17,7 @@
ref="testBean"
method="bad"
output-channel="output">
-
+
@@ -27,7 +27,7 @@
ref="testBean"
method="good"
output-channel="output">
-
+