diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml index 5ecb6241fa..9dc605bbbf 100644 --- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml +++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml @@ -17,7 +17,11 @@ + message-converter="converter"> + + + + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 8b1ec9251b..f785945b57 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -24,17 +24,14 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; -import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.message.MessageHandler; -import org.springframework.integration.scheduling.Trigger; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionDefinition; import org.springframework.util.Assert; /** @@ -48,20 +45,10 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar private volatile String inputChannelName; - private volatile Trigger trigger; - - private volatile int maxMessagesPerPoll; - - private volatile long receiveTimeout = 1000; + private volatile PollerMetadata pollerMetadata; private volatile boolean autoStartup = true; - private volatile TaskExecutor taskExecutor; - - private volatile PlatformTransactionManager transactionManager; - - private volatile TransactionDefinition transactionDefinition; - private volatile ConfigurableBeanFactory beanFactory; private volatile AbstractEndpoint endpoint; @@ -81,34 +68,14 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar this.inputChannelName = inputChannelName; } - public void setTrigger(Trigger trigger) { - this.trigger = trigger; - } - - public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { - this.maxMessagesPerPoll = maxMessagesPerPoll; - } - - public void setReceiveTimeout(long receiveTimeout) { - this.receiveTimeout = receiveTimeout; + public void setPollerMetadata(PollerMetadata pollerMetadata) { + this.pollerMetadata = pollerMetadata; } public void setAutoStartup(boolean autoStartup) { this.autoStartup = autoStartup; } - public void setTaskExecutor(TaskExecutor taskExecutor) { - this.taskExecutor = taskExecutor; - } - - public void setTransactionManager(PlatformTransactionManager transactionManager) { - this.transactionManager = transactionManager; - } - - public void setTransactionDefinition(TransactionDefinition transactionDefinition) { - this.transactionDefinition = transactionDefinition; - } - public void setBeanName(String beanName) { this.beanName = beanName; } @@ -152,19 +119,24 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar MessageChannel channel = (MessageChannel) this.beanFactory.getBean(this.inputChannelName, MessageChannel.class); if (channel instanceof SubscribableChannel) { - Assert.isNull(trigger, "A trigger should not be specified for endpoint '" + this.beanName + Assert.isNull(this.pollerMetadata, "A poller should not be specified for endpoint '" + this.beanName + "', since '" + this.inputChannelName + "' is a SubscribableChannel (not pollable)."); this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler); } else if (channel instanceof PollableChannel) { PollingConsumer pollingConsumer = new PollingConsumer( (PollableChannel) channel, this.handler); - pollingConsumer.setTrigger(this.trigger); - pollingConsumer.setMaxMessagesPerPoll(this.maxMessagesPerPoll); - pollingConsumer.setReceiveTimeout(this.receiveTimeout); - pollingConsumer.setTaskExecutor(this.taskExecutor); - pollingConsumer.setTransactionManager(this.transactionManager); - pollingConsumer.setTransactionDefinition(this.transactionDefinition); + if (this.pollerMetadata == null) { + this.pollerMetadata = IntegrationContextUtils.getDefaultPollerMetadata(this.beanFactory); + Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + + this.beanName + "', and no default poller is available within the context."); + } + pollingConsumer.setTrigger(this.pollerMetadata.getTrigger()); + pollingConsumer.setMaxMessagesPerPoll(this.pollerMetadata.getMaxMessagesPerPoll()); + pollingConsumer.setReceiveTimeout(this.pollerMetadata.getReceiveTimeout()); + pollingConsumer.setTaskExecutor(this.pollerMetadata.getTaskExecutor()); + pollingConsumer.setTransactionManager(this.pollerMetadata.getTransactionManager()); + pollingConsumer.setTransactionDefinition(this.pollerMetadata.getTransactionDefinition()); this.endpoint = pollingConsumer; } else { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/PollerMetadata.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/PollerMetadata.java new file mode 100644 index 0000000000..f0150dca6a --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/PollerMetadata.java @@ -0,0 +1,90 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.scheduling.Trigger; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; + +/** + * @author Mark Fisher + */ +public class PollerMetadata { + + private volatile Trigger trigger; + + private volatile int maxMessagesPerPoll; + + private volatile long receiveTimeout = 1000; + + private volatile TaskExecutor taskExecutor; + + private volatile PlatformTransactionManager transactionManager; + + private volatile TransactionDefinition transactionDefinition; + + + public void setTrigger(Trigger trigger) { + this.trigger = trigger; + } + + public Trigger getTrigger() { + return this.trigger; + } + + public void setMaxMessagesPerPoll(int maxMessagesPerPoll) { + this.maxMessagesPerPoll = maxMessagesPerPoll; + } + + public int getMaxMessagesPerPoll() { + return this.maxMessagesPerPoll; + } + + public void setReceiveTimeout(long receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public long getReceiveTimeout() { + return this.receiveTimeout; + } + + public void setTaskExecutor(TaskExecutor taskExecutor) { + this.taskExecutor = taskExecutor; + } + + public TaskExecutor getTaskExecutor() { + return this.taskExecutor; + } + + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + public PlatformTransactionManager getTransactionManager() { + return this.transactionManager; + } + + public void setTransactionDefinition(TransactionDefinition transactionDefinition) { + this.transactionDefinition = transactionDefinition; + } + + public TransactionDefinition getTransactionDefinition() { + return this.transactionDefinition; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java index 2c174f5907..47e8e9dd86 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java @@ -86,13 +86,7 @@ public abstract class AbstractConsumerEndpointParser extends AbstractBeanDefinit builder.addPropertyValue("inputChannelName", inputChannelName); Element pollerElement = DomUtils.getChildElementByTagName(element, POLLER_ELEMENT); if (pollerElement != null) { - IntegrationNamespaceUtils.configureTrigger(pollerElement, builder, parserContext); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, pollerElement, "max-messages-per-poll"); - Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); - if (txElement != null) { - IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder); - } - IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, pollerElement, "task-executor"); + IntegrationNamespaceUtils.configurePollerMetadata(pollerElement, builder, parserContext); } IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "auto-startup"); return builder.getBeanDefinition(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractOutboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractOutboundChannelAdapterParser.java index 20b614e190..b5182bc29b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractOutboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractOutboundChannelAdapterParser.java @@ -40,11 +40,7 @@ public abstract class AbstractOutboundChannelAdapterParser extends AbstractChann builder.addConstructorArgReference(this.parseAndRegisterConsumer(element, parserContext)); if (pollerElement != null) { Assert.hasText(channelName, "outbound channel adapter with a 'poller' requires a 'channel' to poll"); - IntegrationNamespaceUtils.configureTrigger(pollerElement, builder, parserContext); - Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); - if (txElement != null) { - IntegrationNamespaceUtils.configureTransactionAttributes(txElement, builder); - } + IntegrationNamespaceUtils.configurePollerMetadata(pollerElement, builder, parserContext); } builder.addPropertyValue("inputChannelName", channelName); return builder.getBeanDefinition(); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java index de8d7fe9a9..1f8c38290a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java @@ -21,6 +21,7 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.PollerMetadata; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.util.Assert; @@ -33,35 +34,33 @@ import org.springframework.util.xml.DomUtils; */ public abstract class AbstractPollingInboundChannelAdapterParser extends AbstractChannelAdapterParser { + private volatile PollerMetadata defaultPollerMetadata; + + @Override protected AbstractBeanDefinition doParse(Element element, ParserContext parserContext, String channelName) { String source = this.parseSource(element, parserContext); Assert.hasText(source, "failed to parse source"); - Element pollerElement = DomUtils.getChildElementByTagName(element, "poller"); BeanDefinitionBuilder adapterBuilder = BeanDefinitionBuilder.genericBeanDefinition(SourcePollingChannelAdapter.class); adapterBuilder.addPropertyReference("source", source); adapterBuilder.addPropertyReference("outputChannel", channelName); + Element pollerElement = DomUtils.getChildElementByTagName(element, "poller"); if (pollerElement != null) { - IntegrationNamespaceUtils.configureTrigger(pollerElement, adapterBuilder, parserContext); - IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, pollerElement, "max-messages-per-poll"); - Element txElement = DomUtils.getChildElementByTagName(pollerElement, "transactional"); - if (txElement != null) { - IntegrationNamespaceUtils.configureTransactionAttributes(txElement, adapterBuilder); - } + IntegrationNamespaceUtils.configurePollerMetadata(pollerElement, adapterBuilder, parserContext); } else { - adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval())); + adapterBuilder.addPropertyValue("pollerMetadata", this.getDefaultPollerMetadata()); } IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup"); return adapterBuilder.getBeanDefinition(); } - /** - * Subclasses may override this to provide the default poll interval (when - * no 'trigger' is configured). Otherwise, the value will be 1 second. - */ - protected int getDefaultPollInterval() { - return 1000; + private synchronized PollerMetadata getDefaultPollerMetadata() { + if (this.defaultPollerMetadata == null) { + this.defaultPollerMetadata = new PollerMetadata(); + this.defaultPollerMetadata.setTrigger(new IntervalTrigger(1000)); + } + return this.defaultPollerMetadata; } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java index 97a77742b8..951b26a40d 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java @@ -112,6 +112,7 @@ public class IntegrationNamespaceHandler implements NamespaceHandler { registerBeanDefinitionParser("bridge", new BridgeParser()); registerBeanDefinitionParser("chain", new ChainParser()); registerBeanDefinitionParser("selector-chain", new SelectorChainParser()); + registerBeanDefinitionParser("poller", new PollerParser()); registerBeanDefinitionParser("annotation-config", new AnnotationConfigParser()); registerBeanDefinitionParser("application-event-multicaster", new ApplicationEventMulticasterParser()); registerBeanDefinitionParser("thread-pool-task-executor", new ThreadPoolTaskExecutorParser()); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java index 7fdb4107ae..ea47efaeac 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java @@ -16,23 +16,19 @@ package org.springframework.integration.config.xml; -import java.util.concurrent.TimeUnit; - import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.parsing.BeanComponentDefinition; +import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.core.Conventions; -import org.springframework.integration.scheduling.CronTrigger; -import org.springframework.integration.scheduling.IntervalTrigger; -import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.Assert; import org.springframework.util.StringUtils; -import org.springframework.util.xml.DomUtils; /** * Shared utility methods for integration namespace parsers. @@ -137,63 +133,30 @@ public abstract class IntegrationNamespaceUtils { } /** - * Parse a "poller" element to create a Trigger and add it to the property values of the target builder. + * Parse a "poller" element to provide a reference for the target + * BeanDefinitionBuilder. If the poller element does not contain a "ref" + * attribute, this will create and register a PollerMetadata instance and + * then add it as a property reference of the target builder. * * @param pollerElement the "poller" element to parse * @param targetBuilder the builder that expects the "trigger" property + * @param parserContext the parserContext for the target builder */ - public static void configureTrigger(Element pollerElement, BeanDefinitionBuilder targetBuilder, ParserContext parserContext) { - String triggerBeanName = null; - Element intervalElement = DomUtils.getChildElementByTagName(pollerElement, "interval-trigger"); - if (intervalElement != null) { - triggerBeanName = parseIntervalTrigger(intervalElement, parserContext); + public static void configurePollerMetadata(Element pollerElement, BeanDefinitionBuilder targetBuilder, ParserContext parserContext) { + String pollerMetadataRef = null; + if (pollerElement.hasAttribute("ref")) { + pollerMetadataRef = pollerElement.getAttribute("ref"); } else { - Element cronElement = DomUtils.getChildElementByTagName(pollerElement, "cron-trigger"); - Assert.notNull(cronElement, - "A element must include either an or child element."); - triggerBeanName = parseCronTrigger(cronElement, parserContext); + ParserContext childContext = new ParserContext( + parserContext.getReaderContext(), parserContext.getDelegate(), targetBuilder.getBeanDefinition()); + BeanDefinition beanDefinition = new PollerParser().parse(pollerElement, childContext); + Assert.notNull(beanDefinition, "BeanDefinition must not be null"); + Assert.isInstanceOf(AbstractBeanDefinition.class, beanDefinition); + pollerMetadataRef = BeanDefinitionReaderUtils.registerWithGeneratedName( + (AbstractBeanDefinition) beanDefinition, parserContext.getRegistry()); } - targetBuilder.addPropertyReference("trigger", triggerBeanName); - } - - private static String parseIntervalTrigger(Element element, ParserContext parserContext) { - String interval = element.getAttribute("interval"); - Assert.hasText(interval, "the 'interval' attribute is required for an "); - TimeUnit timeUnit = TimeUnit.valueOf(element.getAttribute("time-unit")); - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(IntervalTrigger.class); - builder.addConstructorArgValue(interval); - builder.addConstructorArgValue(timeUnit); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "initial-delay"); - IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "fixed-rate"); - return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); - } - - private static String parseCronTrigger(Element element, ParserContext parserContext) { - String cronExpression = element.getAttribute("expression"); - Assert.hasText(cronExpression, "the 'expression' attribute is required for a "); - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CronTrigger.class); - builder.addConstructorArgValue(cronExpression); - return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); - } - - /** - * Parse a "transactional" element and configure the "transactionManager" and "transactionDefinition" - * properties for the target builder. - * - * @param txElement the "transactional" element to parse - * @param targetBuilder the builder that expects the "transactionManager" and "transactionDefinition" properties - */ - public static void configureTransactionAttributes(Element txElement, BeanDefinitionBuilder targetBuilder) { - targetBuilder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager")); - DefaultTransactionDefinition txDefinition = new DefaultTransactionDefinition(); - txDefinition.setPropagationBehaviorName( - DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation")); - txDefinition.setIsolationLevelName( - DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation")); - txDefinition.setTimeout(Integer.valueOf(txElement.getAttribute("timeout"))); - txDefinition.setReadOnly(txElement.getAttribute("read-only").equalsIgnoreCase("true")); - targetBuilder.addPropertyValue("transactionDefinition", txDefinition); + targetBuilder.addPropertyReference("pollerMetadata", pollerMetadataRef); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PollerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PollerParser.java new file mode 100644 index 0000000000..eba65a5c88 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/PollerParser.java @@ -0,0 +1,125 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config.xml; + +import java.util.concurrent.TimeUnit; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.BeanDefinitionStoreException; +import org.springframework.beans.factory.support.AbstractBeanDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.xml.AbstractBeanDefinitionParser; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.integration.config.PollerMetadata; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.scheduling.CronTrigger; +import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.util.Assert; +import org.springframework.util.xml.DomUtils; + +/** + * Parser for the <poller> element. + * + * @author Mark Fisher + */ +public class PollerParser extends AbstractBeanDefinitionParser { + + @Override + protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) throws BeanDefinitionStoreException { + String id = super.resolveId(element, definition, parserContext); + if (element.getAttribute("default").equals("true")) { + Assert.isTrue(!parserContext.getRegistry().isBeanNameInUse(IntegrationContextUtils.DEFAULT_POLLER_METADATA_BEAN_NAME), + "only one default element is allowed per context"); + parserContext.getRegistry().registerAlias(id, IntegrationContextUtils.DEFAULT_POLLER_METADATA_BEAN_NAME); + } + return id; + } + + @Override + protected AbstractBeanDefinition parseInternal(Element element, ParserContext parserContext) { + BeanDefinitionBuilder metadataBuilder = BeanDefinitionBuilder.genericBeanDefinition(PollerMetadata.class); + configureTrigger(element, metadataBuilder, parserContext); + IntegrationNamespaceUtils.setValueIfAttributeDefined(metadataBuilder, element, "max-messages-per-poll"); + Element txElement = DomUtils.getChildElementByTagName(element, "transactional"); + if (txElement != null) { + configureTransactionAttributes(txElement, metadataBuilder); + } + IntegrationNamespaceUtils.setReferenceIfAttributeDefined(metadataBuilder, element, "task-executor"); + return metadataBuilder.getBeanDefinition(); + } + + private void configureTrigger(Element pollerElement, BeanDefinitionBuilder targetBuilder, ParserContext parserContext) { + String triggerBeanName = null; + Element intervalElement = DomUtils.getChildElementByTagName(pollerElement, "interval-trigger"); + if (intervalElement != null) { + triggerBeanName = parseIntervalTrigger(intervalElement, parserContext); + } + else { + Element cronElement = DomUtils.getChildElementByTagName(pollerElement, "cron-trigger"); + Assert.notNull(cronElement, + "A element must include either an or child element."); + triggerBeanName = parseCronTrigger(cronElement, parserContext); + } + targetBuilder.addPropertyReference("trigger", triggerBeanName); + } + + /** + * Parse an "interval-trigger" element + */ + private String parseIntervalTrigger(Element element, ParserContext parserContext) { + String interval = element.getAttribute("interval"); + Assert.hasText(interval, "the 'interval' attribute is required for an "); + TimeUnit timeUnit = TimeUnit.valueOf(element.getAttribute("time-unit")); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(IntervalTrigger.class); + builder.addConstructorArgValue(interval); + builder.addConstructorArgValue(timeUnit); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "initial-delay"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "fixed-rate"); + return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); + } + + /** + * Parse a "cron-trigger" element + */ + private String parseCronTrigger(Element element, ParserContext parserContext) { + String cronExpression = element.getAttribute("expression"); + Assert.hasText(cronExpression, "the 'expression' attribute is required for a "); + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(CronTrigger.class); + builder.addConstructorArgValue(cronExpression); + return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); + } + + /** + * Parse a "transactional" element and configure the "transactionManager" and "transactionDefinition" + * properties for the target builder. + */ + private void configureTransactionAttributes(Element txElement, BeanDefinitionBuilder targetBuilder) { + targetBuilder.addPropertyReference("transactionManager", txElement.getAttribute("transaction-manager")); + DefaultTransactionDefinition txDefinition = new DefaultTransactionDefinition(); + txDefinition.setPropagationBehaviorName( + DefaultTransactionDefinition.PREFIX_PROPAGATION + txElement.getAttribute("propagation")); + txDefinition.setIsolationLevelName( + DefaultTransactionDefinition.PREFIX_ISOLATION + txElement.getAttribute("isolation")); + txDefinition.setTimeout(Integer.valueOf(txElement.getAttribute("timeout"))); + txDefinition.setReadOnly(txElement.getAttribute("read-only").equalsIgnoreCase("true")); + targetBuilder.addPropertyValue("transactionDefinition", txDefinition); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index 1cba09b9b5..ebdd4b7bd8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -164,7 +164,7 @@ - + @@ -230,7 +230,7 @@ - + @@ -317,26 +317,48 @@ + + + Defines a top-level poller. + + - - - Defines a poller. - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + Defines the configuration metadata for a poller. + + + + + + + + + + + + + + + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java index b3c44c52af..f54534ffb6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -19,6 +19,7 @@ package org.springframework.integration.context; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import org.springframework.beans.factory.BeanFactory; +import org.springframework.integration.config.PollerMetadata; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; @@ -35,6 +36,8 @@ public abstract class IntegrationContextUtils { public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"; + public static final String DEFAULT_POLLER_METADATA_BEAN_NAME = "org.springframework.integration.context.defaultPollerMetadata"; + public static MessageChannel getErrorChannel(BeanFactory beanFactory) { return getBeanOfType(beanFactory, ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); @@ -50,6 +53,10 @@ public abstract class IntegrationContextUtils { return taskScheduler; } + public static PollerMetadata getDefaultPollerMetadata(BeanFactory beanFactory) { + return getBeanOfType(beanFactory, DEFAULT_POLLER_METADATA_BEAN_NAME, PollerMetadata.class); + } + @SuppressWarnings("unchecked") private static T getBeanOfType(BeanFactory beanFactory, String beanName, Class type) { if (!beanFactory.containsBean(beanName)) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index a6c5f134c0..af07269299 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -147,13 +147,13 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement this.transactionTemplate = new TransactionTemplate( this.transactionManager, this.transactionDefinition); } - this.poller = this.createPoller(); if (this.taskExecutor != null && !(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) { if (this.errorHandler == null) { this.errorHandler = new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(getBeanFactory())); } this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler); } + this.poller = this.createPoller(); this.initialized = true; } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index a668ed7b7a..a2c0a92c1a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -17,6 +17,7 @@ package org.springframework.integration.endpoint; import org.springframework.integration.channel.MessageChannelTemplate; +import org.springframework.integration.config.PollerMetadata; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.message.MessageSource; @@ -59,6 +60,14 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint { this.channelTemplate.setSendTimeout(sendTimeout); } + public void setPollerMetadata(PollerMetadata pollerMetadata) { + this.setTrigger(pollerMetadata.getTrigger()); + this.setMaxMessagesPerPoll(pollerMetadata.getMaxMessagesPerPoll()); + this.setTaskExecutor(pollerMetadata.getTaskExecutor()); + this.setTransactionDefinition(pollerMetadata.getTransactionDefinition()); + this.setTransactionManager(pollerMetadata.getTransactionManager()); + } + @Override protected void onInit() { Assert.notNull(this.source, "source must not be null"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml index d3dec63e2a..01c197d28e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/endpoint/returnAddressTests.xml @@ -7,6 +7,10 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> + + + +