Added namespace support for Endpoint interceptors including TransactionInterceptor (INT-85).

This commit is contained in:
Mark Fisher
2008-07-01 13:59:02 +00:00
parent 908966d975
commit 486df85a68
11 changed files with 206 additions and 43 deletions

View File

@@ -12,5 +12,6 @@
<classpathentry kind="var" path="IVY_CACHE/org.springframework/org.springframework.context/2.5.4.A/org.springframework.context-2.5.4.A.jar" sourcepath="/IVY_CACHE/org.springframework/org.springframework.context/2.5.4.A/org.springframework.context-sources-2.5.4.A.jar"/>
<classpathentry kind="var" path="IVY_CACHE/org.springframework/org.springframework.context.support/2.5.4.A/org.springframework.context.support-2.5.4.A.jar" sourcepath="/IVY_CACHE/org.springframework/org.springframework.context.support/2.5.4.A/org.springframework.context.support-sources-2.5.4.A.jar"/>
<classpathentry kind="var" path="IVY_CACHE/org.springframework/org.springframework.core/2.5.4.A/org.springframework.core-2.5.4.A.jar" sourcepath="/IVY_CACHE/org.springframework/org.springframework.core/2.5.4.A/org.springframework.core-sources-2.5.4.A.jar"/>
<classpathentry kind="var" path="IVY_CACHE/org.springframework/org.springframework.transaction/2.5.4.A/org.springframework.transaction-2.5.4.A.jar" sourcepath="/IVY_CACHE/org.springframework/org.springframework.transaction/2.5.4.A/org.springframework.transaction-sources-2.5.4.A.jar"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>

View File

@@ -23,6 +23,7 @@
<dependency org="org.junit" name="com.springsource.org.junit" rev="4.4.0" conf="test->runtime"/>
<dependency org="org.springframework" name="org.springframework.aop" rev="2.5.4.A" conf="compile->runtime"/>
<dependency org="org.springframework" name="org.springframework.context" rev="2.5.4.A" conf="compile->runtime"/>
<dependency org="org.springframework" name="org.springframework.transaction" rev="2.5.4.A" conf="compile->runtime"/>
</dependencies>
</ivy-module>

View File

@@ -62,7 +62,7 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef
private static final String CONCURRENCY_POLICY_PROPERTY = "concurrencyPolicy";
private static final String ADVICE_CHAIN_ELEMENT = "advice-chain";
private static final String INTERCEPTORS_ELEMENT = "interceptors";
@Override
@@ -101,9 +101,10 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef
else if (SCHEDULE_ELEMENT.equals(localName)) {
schedule = this.parseSchedule((Element) child);
}
else if (ADVICE_CHAIN_ELEMENT.equals(localName)) {
ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain((Element) child);
builder.addPropertyValue("adviceChain", adviceChain);
else if (INTERCEPTORS_ELEMENT.equals(localName)) {
ManagedList interceptors = IntegrationNamespaceUtils.parseEndpointInterceptors(
(Element) child, parserContext);
builder.addPropertyValue("interceptors", interceptors);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,14 +16,28 @@
package org.springframework.integration.config;
import java.util.LinkedList;
import java.util.List;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
import org.springframework.beans.factory.support.ManagedList;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.endpoint.ConcurrencyPolicy;
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
import org.springframework.transaction.interceptor.NoRollbackRuleAttribute;
import org.springframework.transaction.interceptor.RollbackRuleAttribute;
import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.util.StringUtils;
/**
@@ -65,20 +79,86 @@ public abstract class IntegrationNamespaceUtils {
}
@SuppressWarnings("unchecked")
public static ManagedList parseEndpointAdviceChain(Element element) {
ManagedList adviceChain = new ManagedList();
public static ManagedList parseEndpointInterceptors(Element element, ParserContext parserContext) {
ManagedList interceptors = new ManagedList();
NodeList childNodes = element.getChildNodes();
for (int i = 0; i < childNodes.getLength(); i++) {
Node child = childNodes.item(i);
if (child.getNodeType() == Node.ELEMENT_NODE) {
Element childElement = (Element) child;
String localName = child.getLocalName();
if ("ref".equals(localName)) {
String ref = ((Element) child).getAttribute("bean");
adviceChain.add(new RuntimeBeanReference(ref));
if ("bean".equals(localName)) {
BeanDefinitionParserDelegate beanParser = new BeanDefinitionParserDelegate(parserContext.getReaderContext());
beanParser.initDefaults(childElement.getOwnerDocument().getDocumentElement());
BeanDefinitionHolder beanDefinitionHolder = beanParser.parseBeanDefinitionElement(childElement);
parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinitionHolder));
interceptors.add(new RuntimeBeanReference(beanDefinitionHolder.getBeanName()));
}
else if ("ref".equals(localName)) {
String ref = childElement.getAttribute("bean");
interceptors.add(new RuntimeBeanReference(ref));
}
else if ("transaction-interceptor".equals(localName)) {
String txInterceptorBeanName = parseTransactionInterceptor(childElement, parserContext);
interceptors.add(new RuntimeBeanReference(txInterceptorBeanName));
}
}
}
return adviceChain;
return interceptors;
}
private static String parseTransactionInterceptor(Element element, ParserContext parserContext) {
BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransactionInterceptor.class);
String txManagerRef = element.getAttribute("transaction-manager");
if (!StringUtils.hasText(txManagerRef)) {
txManagerRef = "transactionManager";
}
builder.addPropertyReference("transactionManager", txManagerRef);
RuleBasedTransactionAttribute attribute = new RuleBasedTransactionAttribute();
String propagation = element.getAttribute("propagation");
String isolation = element.getAttribute("isolation");
String timeout = element.getAttribute("timeout");
String readOnly = element.getAttribute("read-only");
if (StringUtils.hasText(propagation)) {
attribute.setPropagationBehaviorName(RuleBasedTransactionAttribute.PREFIX_PROPAGATION + propagation);
}
if (StringUtils.hasText(isolation)) {
attribute.setIsolationLevelName(RuleBasedTransactionAttribute.PREFIX_ISOLATION + isolation);
}
if (StringUtils.hasText(timeout)) {
try {
attribute.setTimeout(Integer.parseInt(timeout));
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Timeout must be an integer value: [" + timeout + "]", element);
}
}
if (StringUtils.hasText(readOnly)) {
attribute.setReadOnly(Boolean.valueOf(readOnly).booleanValue());
}
List rollbackRules = new LinkedList();
if (element.hasAttribute("rollback-for")) {
String rollbackForValue = element.getAttribute("rollback-for");
String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(rollbackForValue);
for (int i = 0; i < exceptionTypeNames.length; i++) {
rollbackRules.add(new RollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i])));
}
}
if (element.hasAttribute("no-rollback-for")) {
String noRollbackForValue = element.getAttribute("no-rollback-for");
String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(noRollbackForValue);
for (int i = 0; i < exceptionTypeNames.length; i++) {
rollbackRules.add(new NoRollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i])));
}
}
attribute.setRollbackRules(rollbackRules);
RootBeanDefinition attributeSourceDefinition = new RootBeanDefinition(MatchAlwaysTransactionAttributeSource.class);
attributeSourceDefinition.setSource(parserContext.extractSource(element));
attributeSourceDefinition.getPropertyValues().addPropertyValue("transactionAttribute", attribute);
String attributeSourceBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName(
attributeSourceDefinition, parserContext.getRegistry());
builder.addPropertyReference("transactionAttributeSource", attributeSourceBeanName);
return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry());
}
/**

View File

@@ -29,7 +29,7 @@ import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.message.Message;
/**
* A post-processor that applies an advice-chain by creating a proxy for an endpoint.
* A post-processor that applies interceptors by creating a proxy for an endpoint.
*
* @author Mark Fisher
*/
@@ -42,11 +42,11 @@ public class MessageEndpointBeanPostProcessor implements BeanPostProcessor {
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AbstractEndpoint) {
AbstractEndpoint endpoint = (AbstractEndpoint) bean;
List<Advice> adviceChain = endpoint.getAdviceChain();
if (adviceChain.size() > 0) {
List<Advice> interceptors = endpoint.getInterceptors();
if (interceptors.size() > 0) {
ProxyFactory proxyFactory = new ProxyFactory(endpoint);
for (Advice advice : adviceChain) {
proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(advice));
for (Advice interceptor : interceptors) {
proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(interceptor));
}
bean = proxyFactory.getProxy();
}

View File

@@ -69,10 +69,11 @@ public class SourceEndpointParser extends AbstractSimpleBeanDefinitionParser {
throw new ConfigurationException("The <schedule/> sub-element is required for a <source-endpoint/>.");
}
builder.addPropertyValue("schedule", this.parseSchedule(scheduleElement));
Element adviceChainElement = DomUtils.getChildElementByTagName(element, "advice-chain");
if (adviceChainElement != null) {
ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain(adviceChainElement);
builder.addPropertyValue("adviceChain", adviceChain);
Element interceptorsElement = DomUtils.getChildElementByTagName(element, "interceptors");
if (interceptorsElement != null) {
ManagedList interceptors = IntegrationNamespaceUtils.parseEndpointInterceptors(
interceptorsElement, parserContext);
builder.addPropertyValue("interceptors", interceptors);
}
}

View File

@@ -166,7 +166,7 @@
<xsd:extension base="beans:identifiedType">
<xsd:sequence>
<xsd:element ref="schedule" minOccurs="1" maxOccurs="1"/>
<xsd:element name="advice-chain" type="adviceChainType" minOccurs="0" maxOccurs="1"/>
<xsd:element name="interceptors" type="interceptorsType" minOccurs="0" maxOccurs="1"/>
</xsd:sequence>
<xsd:attribute name="source" type="xsd:string" use="required"/>
<xsd:attribute name="channel" type="xsd:string" use="required"/>
@@ -416,7 +416,7 @@
<xsd:all>
<xsd:element ref="schedule" minOccurs="0" maxOccurs="1"/>
<xsd:element name="concurrency" type="concurrencyType" minOccurs="0" maxOccurs="1"/>
<xsd:element name="advice-chain" type="adviceChainType" minOccurs="0" maxOccurs="1"/>
<xsd:element name="interceptors" type="interceptorsType" minOccurs="0" maxOccurs="1"/>
</xsd:all>
<xsd:attribute name="input-channel" type="xsd:string" use="required"/>
<xsd:attribute name="error-handler" type="xsd:string"/>
@@ -439,19 +439,97 @@
</xsd:complexContent>
</xsd:complexType>
<xsd:complexType name="adviceChainType">
<xsd:complexType name="interceptorsType">
<xsd:annotation>
<xsd:documentation>
Defines a list of Advice.
Defines a list of interceptors. Each element may be an EndpointInterceptor or any Advice instance.
</xsd:documentation>
</xsd:annotation>
<xsd:sequence>
<xsd:element name="ref" minOccurs="0" maxOccurs="unbounded">
<xsd:complexType>
<xsd:attribute name="bean" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:choice minOccurs="0" maxOccurs="unbounded">
<xsd:element name="ref" minOccurs="0" maxOccurs="unbounded">
<xsd:complexType>
<xsd:attribute name="bean" use="required"/>
</xsd:complexType>
</xsd:element>
<xsd:element name="transaction-interceptor" type="transactionInterceptorType" minOccurs="0" maxOccurs="1"/>
<xsd:any namespace="##other" processContents="strict" minOccurs="0" maxOccurs="unbounded"/>
</xsd:choice>
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="transactionInterceptorType">
<xsd:attribute name="transaction-manager" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The bean name of the PlatformTransactionManager to use. The default is "transactionManager".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="propagation" default="REQUIRED">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.transaction.annotation.Propagation"><![CDATA[
The transaction propagation behavior.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="REQUIRED"/>
<xsd:enumeration value="SUPPORTS"/>
<xsd:enumeration value="MANDATORY"/>
<xsd:enumeration value="REQUIRES_NEW"/>
<xsd:enumeration value="NOT_SUPPORTED"/>
<xsd:enumeration value="NEVER"/>
<xsd:enumeration value="NESTED"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="isolation" default="DEFAULT">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.transaction.annotation.Isolation"><![CDATA[
The transaction isolation level.
]]></xsd:documentation>
</xsd:annotation>
<xsd:simpleType>
<xsd:restriction base="xsd:string">
<xsd:enumeration value="DEFAULT"/>
<xsd:enumeration value="READ_UNCOMMITTED"/>
<xsd:enumeration value="READ_COMMITTED"/>
<xsd:enumeration value="REPEATABLE_READ"/>
<xsd:enumeration value="SERIALIZABLE"/>
</xsd:restriction>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="timeout" type="xsd:integer" default="-1">
<xsd:annotation>
<xsd:documentation><![CDATA[
The transaction timeout value (in seconds).
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="read-only" type="xsd:boolean" default="false">
<xsd:annotation>
<xsd:documentation><![CDATA[
Is this transaction read-only?
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="rollback-for" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The Exception(s) that will trigger rollback; comma-delimited.
For example, 'com.foo.MyBusinessException,ServletException'
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="no-rollback-for" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The Exception(s) that will *not* trigger rollback; comma-delimited.
For example, 'com.foo.MyBusinessException,ServletException'
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:schema>

View File

@@ -40,7 +40,7 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware
private volatile String name;
private final List<Advice> adviceChain = new ArrayList<Advice>();
private final List<Advice> interceptors = new ArrayList<Advice>();
public String getName() {
@@ -59,23 +59,23 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware
return (this.name != null) ? this.name : super.toString();
}
public void setAdviceChain(List<Object> adviceChain) {
for (Object advice : adviceChain) {
if (advice instanceof Advice) {
this.adviceChain.add((Advice) advice);
public void setInterceptors(List<Object> interceptors) {
for (Object interceptor : interceptors) {
if (interceptor instanceof Advice) {
this.interceptors.add((Advice) interceptor);
}
else if (advice instanceof EndpointInterceptor) {
this.adviceChain.add(new EndpointMethodInterceptor((EndpointInterceptor) advice));
else if (interceptor instanceof EndpointInterceptor) {
this.interceptors.add(new EndpointMethodInterceptor((EndpointInterceptor) interceptor));
}
else {
throw new ConfigurationException("Each adviceChain element must implement either "
throw new ConfigurationException("Each interceptor element must implement either "
+ "'" + Advice.class.getName() + "' or '" + EndpointInterceptor.class.getName() + "'.");
}
}
}
public List<Advice> getAdviceChain() {
return this.adviceChain;
public List<Advice> getInterceptors() {
return this.interceptors;
}
public final boolean invoke(Message<?> message) {

View File

@@ -22,14 +22,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.integration.handler.MessageHandler;
import org.springframework.integration.handler.MessageHandlerNotRunningException;
import org.springframework.integration.handler.MessageHandlerRejectedExecutionException;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageDeliveryException;
import org.springframework.integration.message.MessageTarget;
import org.springframework.integration.util.ErrorHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
/**

View File

@@ -14,13 +14,16 @@
* limitations under the License.
*/
package org.springframework.integration.endpoint;
package org.springframework.integration.endpoint.interceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.integration.endpoint.EndpointInterceptor;
import org.springframework.integration.message.Message;
/**
* A convenience base class for implementing {@link EndpointInterceptor EndpointInterceptors}.
*
* @author Mark Fisher
*/
public class EndpointInterceptorAdapter implements EndpointInterceptor {

View File

@@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.integration.endpoint.EndpointInterceptorAdapter;
import org.springframework.integration.endpoint.interceptor.EndpointInterceptorAdapter;
/**
* @author Mark Fisher