From 76da6421d3f891040e152d4783fb01110f717648 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Sat, 18 Jul 2009 00:31:28 +0000 Subject: [PATCH] INT-736 Added namespace support for the 'delayer' endpoint. --- .../integration/config/xml/DelayerParser.java | 53 +++++++++++++++ .../xml/IntegrationNamespaceHandler.java | 1 + .../integration/handler/DelayHandler.java | 13 +++- .../config/xml/spring-integration-1.0.xsd | 66 ++++++++++++++++--- .../config/xml/DelayerParserTests-context.xml | 25 +++++++ .../config/xml/DelayerParserTests.java | 61 +++++++++++++++++ 6 files changed, 210 insertions(+), 9 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/config/xml/DelayerParser.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests-context.xml create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/DelayerParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/DelayerParser.java new file mode 100644 index 0000000000..a49ede4f44 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/DelayerParser.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2009 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config.xml; + +import org.w3c.dom.Element; + +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.xml.ParserContext; +import org.springframework.util.StringUtils; + +/** + * Parser for the <delayer> element. + * + * @author Mark Fisher + * @since 1.0.3 + */ +public class DelayerParser extends AbstractConsumerEndpointParser { + + @Override + protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition( + IntegrationNamespaceUtils.BASE_PACKAGE + ".handler.DelayHandler"); + String defaultDelay = element.getAttribute("default-delay"); + if (!StringUtils.hasText(defaultDelay)) { + parserContext.getReaderContext().error("The 'default-delay' attribute is required.", element); + return null; + } + builder.getBeanDefinition().getConstructorArgumentValues().addIndexedArgumentValue(0, defaultDelay); + String scheduler = element.getAttribute("scheduler"); + if (StringUtils.hasText(scheduler)) { + builder.addConstructorArgReference(scheduler); + } + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delay-header-name"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout"); + IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "wait-for-tasks-to-complete-on-shutdown"); + return builder; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java index 5b30f1bf1e..4f30b48b88 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceHandler.java @@ -47,6 +47,7 @@ public class IntegrationNamespaceHandler extends AbstractIntegrationNamespaceHan registerBeanDefinitionParser("outbound-channel-adapter", new MethodInvokingOutboundChannelAdapterParser()); registerBeanDefinitionParser("logging-channel-adapter", new LoggingChannelAdapterParser()); registerBeanDefinitionParser("gateway", new GatewayParser()); + registerBeanDefinitionParser("delayer", new DelayerParser()); registerBeanDefinitionParser("bridge", new BridgeParser()); registerBeanDefinitionParser("chain", new ChainParser()); registerBeanDefinitionParser("selector-chain", new SelectorChainParser()); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java index db9acb8ed6..90dfe1d627 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/handler/DelayHandler.java @@ -28,6 +28,7 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.DisposableBean; +import org.springframework.core.Ordered; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolutionException; import org.springframework.integration.channel.ChannelResolver; @@ -65,7 +66,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @since 1.0.3 */ -public class DelayHandler implements MessageHandler, BeanFactoryAware, DisposableBean { +public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, DisposableBean { private final Log logger = LogFactory.getLog(this.getClass()); @@ -83,6 +84,8 @@ public class DelayHandler implements MessageHandler, BeanFactoryAware, Disposabl private volatile boolean waitForTasksToCompleteOnShutdown; + private volatile int order = Ordered.LOWEST_PRECEDENCE; + /** * Create a DelayHandler with the given default delay. The sending of Messages after @@ -148,6 +151,14 @@ public class DelayHandler implements MessageHandler, BeanFactoryAware, Disposabl this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown; } + public void setOrder(int order) { + this.order = order; + } + + public int getOrder() { + return this.order; + } + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.channelResolver = new BeanFactoryChannelResolver(beanFactory); } diff --git a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd index f2ec4aaa91..b866f6e787 100644 --- a/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/resources/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -17,7 +17,7 @@ - Enables annotation support for Message Endpoints. + Enables annotation support for Message Endpoints. @@ -248,9 +248,7 @@ - Defines a channel that maintains its Messages - on a - thread-bound queue. + Defines a channel that maintains its Messages on a thread-bound queue. @@ -273,7 +271,7 @@ - Defines a message channel. + Defines a message channel. @@ -303,12 +301,11 @@ - Defines a Messaging Gateway. + Defines a Messaging Gateway. - + @@ -568,6 +565,58 @@ + + + + Defines an endpoint that passes a Message to the output-channel after a delay. The delay may + be retrieved from a Message header or else fallback to the 'default-delay' of this endpoint. + + + + + + + + + + + + Specify the default delay in milliseconds. This value can be set to 0 if the only Messages + that should be delayed are those with a particular header (in that case, be sure to provide + a value for the 'delay-header-name' attribute). + + + + + + + Specify the name of the header that should contain the delay value. This value can either + represent the number of milliseconds to delay counting from the current time or it can be an + absolute Date until which the Message should be delayed. + + + + + + + Specify the maximum amount of time in milliseconds to wait when sending the released + Messages (after delay) to the output channel. By default the send will block indefinitely. + + + + + + + Specify whether tasks should be able to complete on shutdown. By default this is 'false'. + + + + + + + + @@ -620,6 +669,7 @@ + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests-context.xml new file mode 100644 index 0000000000..f4a4ef1ad7 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests-context.xml @@ -0,0 +1,25 @@ + + + + + + + + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java new file mode 100644 index 0000000000..b71536d668 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java @@ -0,0 +1,61 @@ +/* + * Copyright 2002-2009 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config.xml; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.handler.DelayHandler; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Mark Fisher + * @since 1.0.3 + */ +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration +public class DelayerParserTests { + + @Autowired + private ApplicationContext context; + + + @Test + public void checkConfiguration() { + Object endpoint = context.getBean("delayer"); + assertEquals(EventDrivenConsumer.class, endpoint.getClass()); + Object handler = new DirectFieldAccessor(endpoint).getPropertyValue("handler"); + assertEquals(DelayHandler.class, handler.getClass()); + DelayHandler delayHandler = (DelayHandler) handler; + assertEquals(99, delayHandler.getOrder()); + DirectFieldAccessor accessor = new DirectFieldAccessor(delayHandler); + assertEquals(context.getBean("output"), accessor.getPropertyValue("outputChannel")); + assertEquals(new Long(1234), accessor.getPropertyValue("defaultDelay")); + assertEquals("foo", accessor.getPropertyValue("delayHeaderName")); + assertEquals(new Long(987), new DirectFieldAccessor( + accessor.getPropertyValue("channelTemplate")).getPropertyValue("sendTimeout")); + assertEquals(Boolean.TRUE, accessor.getPropertyValue("waitForTasksToCompleteOnShutdown")); + } + +}