INT-736 Added namespace support for the 'delayer' endpoint.
This commit is contained in:
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright 2002-2009 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config.xml;
|
||||
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
|
||||
import org.springframework.beans.factory.xml.ParserContext;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Parser for the <delayer> element.
|
||||
*
|
||||
* @author Mark Fisher
|
||||
* @since 1.0.3
|
||||
*/
|
||||
public class DelayerParser extends AbstractConsumerEndpointParser {
|
||||
|
||||
@Override
|
||||
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
|
||||
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
|
||||
IntegrationNamespaceUtils.BASE_PACKAGE + ".handler.DelayHandler");
|
||||
String defaultDelay = element.getAttribute("default-delay");
|
||||
if (!StringUtils.hasText(defaultDelay)) {
|
||||
parserContext.getReaderContext().error("The 'default-delay' attribute is required.", element);
|
||||
return null;
|
||||
}
|
||||
builder.getBeanDefinition().getConstructorArgumentValues().addIndexedArgumentValue(0, defaultDelay);
|
||||
String scheduler = element.getAttribute("scheduler");
|
||||
if (StringUtils.hasText(scheduler)) {
|
||||
builder.addConstructorArgReference(scheduler);
|
||||
}
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "delay-header-name");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "send-timeout");
|
||||
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "wait-for-tasks-to-complete-on-shutdown");
|
||||
return builder;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -47,6 +47,7 @@ public class IntegrationNamespaceHandler extends AbstractIntegrationNamespaceHan
|
||||
registerBeanDefinitionParser("outbound-channel-adapter", new MethodInvokingOutboundChannelAdapterParser());
|
||||
registerBeanDefinitionParser("logging-channel-adapter", new LoggingChannelAdapterParser());
|
||||
registerBeanDefinitionParser("gateway", new GatewayParser());
|
||||
registerBeanDefinitionParser("delayer", new DelayerParser());
|
||||
registerBeanDefinitionParser("bridge", new BridgeParser());
|
||||
registerBeanDefinitionParser("chain", new ChainParser());
|
||||
registerBeanDefinitionParser("selector-chain", new SelectorChainParser());
|
||||
|
||||
@@ -28,6 +28,7 @@ import org.springframework.beans.BeansException;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.beans.factory.BeanFactoryAware;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.integration.channel.BeanFactoryChannelResolver;
|
||||
import org.springframework.integration.channel.ChannelResolutionException;
|
||||
import org.springframework.integration.channel.ChannelResolver;
|
||||
@@ -65,7 +66,7 @@ import org.springframework.util.Assert;
|
||||
* @author Mark Fisher
|
||||
* @since 1.0.3
|
||||
*/
|
||||
public class DelayHandler implements MessageHandler, BeanFactoryAware, DisposableBean {
|
||||
public class DelayHandler implements MessageHandler, Ordered, BeanFactoryAware, DisposableBean {
|
||||
|
||||
private final Log logger = LogFactory.getLog(this.getClass());
|
||||
|
||||
@@ -83,6 +84,8 @@ public class DelayHandler implements MessageHandler, BeanFactoryAware, Disposabl
|
||||
|
||||
private volatile boolean waitForTasksToCompleteOnShutdown;
|
||||
|
||||
private volatile int order = Ordered.LOWEST_PRECEDENCE;
|
||||
|
||||
|
||||
/**
|
||||
* Create a DelayHandler with the given default delay. The sending of Messages after
|
||||
@@ -148,6 +151,14 @@ public class DelayHandler implements MessageHandler, BeanFactoryAware, Disposabl
|
||||
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
|
||||
}
|
||||
|
||||
public void setOrder(int order) {
|
||||
this.order = order;
|
||||
}
|
||||
|
||||
public int getOrder() {
|
||||
return this.order;
|
||||
}
|
||||
|
||||
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
|
||||
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
<xsd:element name="annotation-config">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Enables annotation support for Message Endpoints.
|
||||
Enables annotation support for Message Endpoints.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
@@ -248,9 +248,7 @@
|
||||
<xsd:element name="thread-local-channel">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Defines a channel that maintains its Messages
|
||||
on a
|
||||
thread-bound queue.
|
||||
Defines a channel that maintains its Messages on a thread-bound queue.
|
||||
</xsd:documentation>
|
||||
<xsd:appinfo>
|
||||
<tool:annotation>
|
||||
@@ -273,7 +271,7 @@
|
||||
<xsd:complexType name="channelType">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Defines a message channel.
|
||||
Defines a message channel.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:attribute name="id" type="xsd:ID" use="required" />
|
||||
@@ -303,12 +301,11 @@
|
||||
<xsd:complexType>
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Defines a Messaging Gateway.
|
||||
Defines a Messaging Gateway.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:attribute name="id" type="xsd:ID" use="required" />
|
||||
<xsd:attribute name="service-interface" type="xsd:string"
|
||||
use="required">
|
||||
<xsd:attribute name="service-interface" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:appinfo>
|
||||
<tool:annotation kind="direct">
|
||||
@@ -568,6 +565,58 @@
|
||||
</xsd:complexContent>
|
||||
</xsd:complexType>
|
||||
|
||||
<xsd:element name="delayer">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Defines an endpoint that passes a Message to the output-channel after a delay. The delay may
|
||||
be retrieved from a Message header or else fallback to the 'default-delay' of this endpoint.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
<xsd:complexType>
|
||||
<xsd:complexContent>
|
||||
<xsd:extension base="inputOutputEndpointType">
|
||||
<xsd:sequence>
|
||||
<xsd:element name="poller" type="innerPollerType"
|
||||
minOccurs="0" maxOccurs="1" />
|
||||
</xsd:sequence>
|
||||
<xsd:attribute name="default-delay" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specify the default delay in milliseconds. This value can be set to 0 if the only Messages
|
||||
that should be delayed are those with a particular header (in that case, be sure to provide
|
||||
a value for the 'delay-header-name' attribute).
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="delay-header-name" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specify the name of the header that should contain the delay value. This value can either
|
||||
represent the number of milliseconds to delay counting from the current time or it can be an
|
||||
absolute Date until which the Message should be delayed.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="send-timeout" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specify the maximum amount of time in milliseconds to wait when sending the released
|
||||
Messages (after delay) to the output channel. By default the send will block indefinitely.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="wait-for-tasks-to-complete-on-shutdown" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Specify whether tasks should be able to complete on shutdown. By default this is 'false'.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:extension>
|
||||
</xsd:complexContent>
|
||||
</xsd:complexType>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="bridge">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
@@ -620,6 +669,7 @@
|
||||
<xsd:element ref="splitter" />
|
||||
<xsd:element ref="aggregator" />
|
||||
<xsd:element ref="router" />
|
||||
<xsd:element ref="delayer" />
|
||||
<xsd:element ref="chain" />
|
||||
<xsd:element ref="poller" />
|
||||
</xsd:choice>
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans:beans xmlns="http://www.springframework.org/schema/integration"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:beans="http://www.springframework.org/schema/beans"
|
||||
xsi:schemaLocation="http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
|
||||
http://www.springframework.org/schema/integration
|
||||
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
|
||||
|
||||
<channel id="input"/>
|
||||
|
||||
<channel id="output">
|
||||
<queue />
|
||||
</channel>
|
||||
|
||||
<delayer id="delayer"
|
||||
input-channel="input"
|
||||
output-channel="output"
|
||||
default-delay="1234"
|
||||
delay-header-name="foo"
|
||||
order="99"
|
||||
send-timeout="987"
|
||||
wait-for-tasks-to-complete-on-shutdown="true"/>
|
||||
|
||||
</beans:beans>
|
||||
@@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Copyright 2002-2009 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.integration.config.xml;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import org.springframework.beans.DirectFieldAccessor;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.integration.endpoint.EventDrivenConsumer;
|
||||
import org.springframework.integration.handler.DelayHandler;
|
||||
import org.springframework.test.context.ContextConfiguration;
|
||||
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
|
||||
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
* @since 1.0.3
|
||||
*/
|
||||
@RunWith(SpringJUnit4ClassRunner.class)
|
||||
@ContextConfiguration
|
||||
public class DelayerParserTests {
|
||||
|
||||
@Autowired
|
||||
private ApplicationContext context;
|
||||
|
||||
|
||||
@Test
|
||||
public void checkConfiguration() {
|
||||
Object endpoint = context.getBean("delayer");
|
||||
assertEquals(EventDrivenConsumer.class, endpoint.getClass());
|
||||
Object handler = new DirectFieldAccessor(endpoint).getPropertyValue("handler");
|
||||
assertEquals(DelayHandler.class, handler.getClass());
|
||||
DelayHandler delayHandler = (DelayHandler) handler;
|
||||
assertEquals(99, delayHandler.getOrder());
|
||||
DirectFieldAccessor accessor = new DirectFieldAccessor(delayHandler);
|
||||
assertEquals(context.getBean("output"), accessor.getPropertyValue("outputChannel"));
|
||||
assertEquals(new Long(1234), accessor.getPropertyValue("defaultDelay"));
|
||||
assertEquals("foo", accessor.getPropertyValue("delayHeaderName"));
|
||||
assertEquals(new Long(987), new DirectFieldAccessor(
|
||||
accessor.getPropertyValue("channelTemplate")).getPropertyValue("sendTimeout"));
|
||||
assertEquals(Boolean.TRUE, accessor.getPropertyValue("waitForTasksToCompleteOnShutdown"));
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user