INT-677, Added inner handler recognition to Filter, modified/re-factored Test cases structure and updated documentation

This commit is contained in:
Oleg Zhurakousky
2009-06-29 00:22:54 +00:00
parent e6062ba749
commit 39a6beefef
5 changed files with 140 additions and 63 deletions

View File

@@ -18,6 +18,7 @@ package org.springframework.integration.config.xml;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.xml.ParserContext;
@@ -27,6 +28,7 @@ import org.springframework.util.StringUtils;
* Parser for the <filter/> element.
*
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
public class FilterParser extends AbstractConsumerEndpointParser {
@@ -34,16 +36,24 @@ public class FilterParser extends AbstractConsumerEndpointParser {
protected BeanDefinitionBuilder parseHandler(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
IntegrationNamespaceUtils.BASE_PACKAGE + ".filter.MessageFilter");
builder.addConstructorArgReference(this.parseSelector(element, parserContext));
builder.addConstructorArgReference((String) this.parseSelector(element, parserContext));
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "discard-channel");
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element, "throw-exception-on-rejection");
return builder;
}
private String parseSelector(Element element, ParserContext parserContext) {
String ref = element.getAttribute("ref");
if (!StringUtils.hasText(ref)) {
parserContext.getReaderContext().error("The 'ref' attribute is required.", element);
BeanDefinition innerHandlerDefinition = this.parseInnerHandlerDefinition(element, parserContext);
String ref = null;
if (innerHandlerDefinition == null){
ref = element.getAttribute("ref");
if (!StringUtils.hasText(ref)) {
parserContext.getReaderContext().error("Either \"ref\" attribute or inner bean (<bean/>) definition of concrete implementation of " +
"this MessageFilter is required.", element);
return null;
}
}
String method = element.getAttribute("method");
if (!StringUtils.hasText(method)) {
@@ -51,10 +61,14 @@ public class FilterParser extends AbstractConsumerEndpointParser {
}
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
IntegrationNamespaceUtils.BASE_PACKAGE + ".filter.MethodInvokingSelector");
builder.addConstructorArgReference(ref);
if (innerHandlerDefinition != null){
builder.addConstructorArgValue(innerHandlerDefinition);
} else {
builder.addConstructorArgReference(ref);
}
builder.getRawBeanDefinition().getConstructorArgumentValues().addGenericArgumentValue(method, "java.lang.String");
return BeanDefinitionReaderUtils.registerWithGeneratedName(
builder.getBeanDefinition(), parserContext.getRegistry());
return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
}
}

View File

@@ -843,7 +843,7 @@
</xsd:annotation>
<xsd:complexType>
<xsd:complexContent>
<xsd:extension base="handlerEndpointType">
<xsd:extension base="innerEndpointDefinitionAware">
<xsd:attribute name="discard-channel" type="xsd:string" />
<xsd:attribute name="throw-exception-on-rejection"
type="xsd:string" default="false" />

View File

@@ -31,6 +31,7 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.xml.XmlBeanDefinitionReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.io.InputStreamResource;
import org.springframework.integration.channel.DirectChannel;
@@ -41,6 +42,7 @@ import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.message.GenericMessage;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.CollectionUtils;
@@ -48,7 +50,7 @@ import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
*
*
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration
@@ -70,11 +72,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerSplitterDefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("splitter-failure-refAndBean");
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
this.bootStrap(xmlConfig);
}
@Test
@@ -91,11 +89,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerTransformerDefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("transformer-failure-refAndBean");
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
this.bootStrap(xmlConfig);
}
@Test
@@ -112,11 +106,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerRouterDefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("router-failure-refAndBean");
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
this.bootStrap(xmlConfig);
}
@Test
@@ -132,11 +122,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerSADefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("sa-failure-refAndBean");
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
this.bootStrap(xmlConfig);
}
@Test
@@ -157,24 +143,34 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerAggregatorDefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("aggregator-failure-refAndBean");
ByteArrayInputStream stream = new ByteArrayInputStream(xmlConfig.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
this.bootStrap(xmlConfig);
}
@Test
public void testInnerFilterDefinitionSuccess(){
String configProperty = testConfigurations.getProperty("filter-inner-success");
this.testFilterDefinitionSuccess(configProperty);
}
@Test
public void testRefFilterDefinitionSuccess(){
String configProperty = testConfigurations.getProperty("filter-ref-success");
System.out.println(configProperty);
this.testFilterDefinitionSuccess(configProperty);
}
@Test(expected=BeanDefinitionStoreException.class)
public void testInnerFilterDefinitionFailureRefAndInner(){
String xmlConfig = testConfigurations.getProperty("filter-failure-refAndBean");
this.bootStrap(xmlConfig);
}
private void testSplitterDefinitionSuccess(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ApplicationContext ac = this.bootStrap(configProperty);
EventDrivenConsumer splitter = (EventDrivenConsumer) ac.getBean("testSplitter");
Assert.assertNotNull(splitter);
MessageBuilder inChannelMessageBuilder = MessageBuilder.withPayload(new String[]{"One","Two"});
Message inMessage = inChannelMessageBuilder.build();
DirectChannel inChannel = (DirectChannel) ac.getBean("inChannel");
MessageChannel inChannel = (MessageChannel) ac.getBean("inChannel");
inChannel.send(inMessage);
PollableChannel outChannel = (PollableChannel) ac.getBean("outChannel");
Assert.assertTrue(outChannel.receive().getPayload() instanceof String);
@@ -183,11 +179,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
}
private void testTransformerDefinitionSuccess(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ApplicationContext ac = this.bootStrap(configProperty);
EventDrivenConsumer transformer = (EventDrivenConsumer) ac.getBean("testTransformer");
Assert.assertNotNull(transformer);
MessageBuilder inChannelMessageBuilder = MessageBuilder.withPayload(new String[]{"One","Two"});
@@ -199,11 +191,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
Assert.assertTrue(payload.equals("One,Two"));
}
private void testRouterDefinitionSuccess(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ApplicationContext ac = this.bootStrap(configProperty);
EventDrivenConsumer splitter = (EventDrivenConsumer) ac.getBean("testRouter");
Assert.assertNotNull(splitter);
MessageBuilder inChannelMessageBuilder = MessageBuilder.withPayload("1");
@@ -219,11 +207,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
Assert.assertTrue(channel2.receive().getPayload().equals("2"));
}
private void testSADefinitionSuccess(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ApplicationContext ac = this.bootStrap(configProperty);
EventDrivenConsumer splitter = (EventDrivenConsumer) ac.getBean("testServiceActivator");
Assert.assertNotNull(splitter);
MessageBuilder inChannelMessageBuilder = MessageBuilder.withPayload("1");
@@ -234,13 +218,7 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
Assert.assertTrue(channel1.receive().getPayload().equals("1"));
}
private void testAggregatorDefinitionSuccess(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ac.refresh();
ac.start();
ApplicationContext ac = this.bootStrap(configProperty);
MessageChannel inChannel = (MessageChannel) ac.getBean("inChannel");
for (int i = 0; i < 5; i++) {
Map<String, Object> headers = stubHeaders(i, 5, 1);
@@ -249,9 +227,25 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
}
PollableChannel output = (PollableChannel) ac.getBean("outChannel");
assertEquals(0 + 1 + 2 + 3 + 4, output.receive().getPayload());
System.out.println();
}
private void testFilterDefinitionSuccess(String configProperty){
ApplicationContext ac = this.bootStrap(configProperty);
MessageChannel input = (MessageChannel) ac.getBean("inChannel");
PollableChannel output = (PollableChannel) ac.getBean("outChannel");
input.send(new StringMessage("foo"));
Message<?> reply = output.receive(0);
assertEquals("foo", reply.getPayload());
}
private ApplicationContext bootStrap(String configProperty){
ByteArrayInputStream stream = new ByteArrayInputStream(configProperty.getBytes());
GenericApplicationContext ac = new GenericApplicationContext();
XmlBeanDefinitionReader reader = new XmlBeanDefinitionReader(ac);
reader.setValidationMode(XmlBeanDefinitionReader.VALIDATION_XSD);
reader.loadBeanDefinitions(new InputStreamResource(stream));
ac.refresh();
return ac;
}
private Map<String, Object> stubHeaders(int sequenceNumber, int sequenceSize, int correllationId) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(MessageHeaders.SEQUENCE_NUMBER, sequenceNumber);
@@ -293,5 +287,11 @@ public class InnerDefinitionHandlerAwareEndpointParserTests {
return result;
}
}
public static class TestMessageFilter{
public boolean filter(String value) {
System.out.println(">>>> Filtering");
return value.equals("foo");
}
}
}

View File

@@ -263,4 +263,51 @@ aggregator-failure-refAndBean=\
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
</aggregator> \
<beans:bean id="aggregator" class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestAggregator"/> \
</beans:beans>
filter-inner-success=\
<?xml version="1.0" encoding="UTF-8"?> \
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
xsi:schemaLocation="http://www.springframework.org/schema/beans \
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
http://www.springframework.org/schema/integration \
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
<channel id="inChannel"/> \
<channel id="outChannel"> \
<queue capacity="1" /> \
</channel> \
<filter input-channel="inChannel" method="filter" output-channel="outChannel"> \
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestMessageFilter"/> \
</filter> \
</beans:beans>
filter-ref-success=\
<?xml version="1.0" encoding="UTF-8"?> \
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
xsi:schemaLocation="http://www.springframework.org/schema/beans \
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
http://www.springframework.org/schema/integration \
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
<channel id="inChannel"/> \
<channel id="outChannel"> \
<queue capacity="1"/> \
</channel> \
<filter input-channel="inChannel" ref="testFilterBean" method="filter" output-channel="outChannel"/> \
<beans:bean id="testFilterBean" class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestMessageFilter"/> \
</beans:beans>
filter-failure-refAndBean=\
<?xml version="1.0" encoding="UTF-8"?> \
<beans:beans xmlns="http://www.springframework.org/schema/integration" \
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" \
xsi:schemaLocation="http://www.springframework.org/schema/beans \
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd \
http://www.springframework.org/schema/integration \
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> \
<channel id="inChannel"/> \
<channel id="outChannel"> \
<queue capacity="1"/> \
</channel> \
<filter input-channel="inChannel" ref="testFilterBean" method="filter" output-channel="outChannel"> \
<beans:bean class="org.springframework.integration.config.xml.InnerDefinitionHandlerAwareEndpointParserTests$TestMessageFilter"/> \
</filter>
</beans:beans>

View File

@@ -52,6 +52,22 @@
alternative to the more <emphasis>proactive</emphasis> approach of using a Message Router with a single
point-to-point input channel and multiple output channels.
</note>
<para>
Using a "ref" attribute is generally recommended if the custom filter implementation can be reused in other
<code>&lt;filter&gt;</code> definitions. However if the custom filter implementation should be scoped to a
concrete definition of the <code>&lt;filter&gt;</code>, starting with v1.0.3, Spring Integration supports inner
bean definitions for custom filters within the <code>&lt;filter&gt;</code> element:
<programlisting language="xml"><![CDATA[<filter method="someMethod" input-channel="inChannel" output-channel="outChannel">
<beans:bean class="org.foo.MyCustomFilter"/>
</filter>]]></programlisting>
</para>
<note>
<para>
Using both the "ref" attribute and an inner handler definition in the same <code>&lt;filter&gt;</code> configuration
is not allowed, as it creates an ambiguous condition and will result in Exception being thrown.
</para>
</note>
</section>
</chapter>