diff --git a/org.springframework.integration/.classpath b/org.springframework.integration/.classpath index 3f112f3ff4..c6af3a7de4 100644 --- a/org.springframework.integration/.classpath +++ b/org.springframework.integration/.classpath @@ -12,5 +12,6 @@ + diff --git a/org.springframework.integration/ivy.xml b/org.springframework.integration/ivy.xml index 5cbf8d164c..7297302778 100644 --- a/org.springframework.integration/ivy.xml +++ b/org.springframework.integration/ivy.xml @@ -23,6 +23,7 @@ + \ No newline at end of file diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java index 98ed241a03..525cb1ad14 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/AbstractTargetEndpointParser.java @@ -62,7 +62,7 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef private static final String CONCURRENCY_POLICY_PROPERTY = "concurrencyPolicy"; - private static final String ADVICE_CHAIN_ELEMENT = "advice-chain"; + private static final String INTERCEPTORS_ELEMENT = "interceptors"; @Override @@ -101,9 +101,10 @@ public abstract class AbstractTargetEndpointParser extends AbstractSingleBeanDef else if (SCHEDULE_ELEMENT.equals(localName)) { schedule = this.parseSchedule((Element) child); } - else if (ADVICE_CHAIN_ELEMENT.equals(localName)) { - ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain((Element) child); - builder.addPropertyValue("adviceChain", adviceChain); + else if (INTERCEPTORS_ELEMENT.equals(localName)) { + ManagedList interceptors = IntegrationNamespaceUtils.parseEndpointInterceptors( + (Element) child, parserContext); + builder.addPropertyValue("interceptors", interceptors); } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java index d24b79c7b1..be5369f9ef 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/IntegrationNamespaceUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2007 the original author or authors. + * Copyright 2002-2008 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,14 +16,28 @@ package org.springframework.integration.config; +import java.util.LinkedList; +import java.util.List; + import org.w3c.dom.Element; import org.w3c.dom.Node; import org.w3c.dom.NodeList; +import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.config.RuntimeBeanReference; +import org.springframework.beans.factory.parsing.BeanComponentDefinition; +import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; +import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.ConcurrencyPolicy; +import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource; +import org.springframework.transaction.interceptor.NoRollbackRuleAttribute; +import org.springframework.transaction.interceptor.RollbackRuleAttribute; +import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; +import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.StringUtils; /** @@ -65,20 +79,86 @@ public abstract class IntegrationNamespaceUtils { } @SuppressWarnings("unchecked") - public static ManagedList parseEndpointAdviceChain(Element element) { - ManagedList adviceChain = new ManagedList(); + public static ManagedList parseEndpointInterceptors(Element element, ParserContext parserContext) { + ManagedList interceptors = new ManagedList(); NodeList childNodes = element.getChildNodes(); for (int i = 0; i < childNodes.getLength(); i++) { Node child = childNodes.item(i); if (child.getNodeType() == Node.ELEMENT_NODE) { + Element childElement = (Element) child; String localName = child.getLocalName(); - if ("ref".equals(localName)) { - String ref = ((Element) child).getAttribute("bean"); - adviceChain.add(new RuntimeBeanReference(ref)); + if ("bean".equals(localName)) { + BeanDefinitionParserDelegate beanParser = new BeanDefinitionParserDelegate(parserContext.getReaderContext()); + beanParser.initDefaults(childElement.getOwnerDocument().getDocumentElement()); + BeanDefinitionHolder beanDefinitionHolder = beanParser.parseBeanDefinitionElement(childElement); + parserContext.registerBeanComponent(new BeanComponentDefinition(beanDefinitionHolder)); + interceptors.add(new RuntimeBeanReference(beanDefinitionHolder.getBeanName())); + } + else if ("ref".equals(localName)) { + String ref = childElement.getAttribute("bean"); + interceptors.add(new RuntimeBeanReference(ref)); + } + else if ("transaction-interceptor".equals(localName)) { + String txInterceptorBeanName = parseTransactionInterceptor(childElement, parserContext); + interceptors.add(new RuntimeBeanReference(txInterceptorBeanName)); } } } - return adviceChain; + return interceptors; + } + + private static String parseTransactionInterceptor(Element element, ParserContext parserContext) { + BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(TransactionInterceptor.class); + String txManagerRef = element.getAttribute("transaction-manager"); + if (!StringUtils.hasText(txManagerRef)) { + txManagerRef = "transactionManager"; + } + builder.addPropertyReference("transactionManager", txManagerRef); + RuleBasedTransactionAttribute attribute = new RuleBasedTransactionAttribute(); + String propagation = element.getAttribute("propagation"); + String isolation = element.getAttribute("isolation"); + String timeout = element.getAttribute("timeout"); + String readOnly = element.getAttribute("read-only"); + if (StringUtils.hasText(propagation)) { + attribute.setPropagationBehaviorName(RuleBasedTransactionAttribute.PREFIX_PROPAGATION + propagation); + } + if (StringUtils.hasText(isolation)) { + attribute.setIsolationLevelName(RuleBasedTransactionAttribute.PREFIX_ISOLATION + isolation); + } + if (StringUtils.hasText(timeout)) { + try { + attribute.setTimeout(Integer.parseInt(timeout)); + } + catch (NumberFormatException ex) { + parserContext.getReaderContext().error("Timeout must be an integer value: [" + timeout + "]", element); + } + } + if (StringUtils.hasText(readOnly)) { + attribute.setReadOnly(Boolean.valueOf(readOnly).booleanValue()); + } + List rollbackRules = new LinkedList(); + if (element.hasAttribute("rollback-for")) { + String rollbackForValue = element.getAttribute("rollback-for"); + String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(rollbackForValue); + for (int i = 0; i < exceptionTypeNames.length; i++) { + rollbackRules.add(new RollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i]))); + } + } + if (element.hasAttribute("no-rollback-for")) { + String noRollbackForValue = element.getAttribute("no-rollback-for"); + String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(noRollbackForValue); + for (int i = 0; i < exceptionTypeNames.length; i++) { + rollbackRules.add(new NoRollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i]))); + } + } + attribute.setRollbackRules(rollbackRules); + RootBeanDefinition attributeSourceDefinition = new RootBeanDefinition(MatchAlwaysTransactionAttributeSource.class); + attributeSourceDefinition.setSource(parserContext.extractSource(element)); + attributeSourceDefinition.getPropertyValues().addPropertyValue("transactionAttribute", attribute); + String attributeSourceBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( + attributeSourceDefinition, parserContext.getRegistry()); + builder.addPropertyReference("transactionAttributeSource", attributeSourceBeanName); + return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); } /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java index bb475f1bec..26afeb68a8 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java @@ -29,7 +29,7 @@ import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.message.Message; /** - * A post-processor that applies an advice-chain by creating a proxy for an endpoint. + * A post-processor that applies interceptors by creating a proxy for an endpoint. * * @author Mark Fisher */ @@ -42,11 +42,11 @@ public class MessageEndpointBeanPostProcessor implements BeanPostProcessor { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof AbstractEndpoint) { AbstractEndpoint endpoint = (AbstractEndpoint) bean; - List adviceChain = endpoint.getAdviceChain(); - if (adviceChain.size() > 0) { + List interceptors = endpoint.getInterceptors(); + if (interceptors.size() > 0) { ProxyFactory proxyFactory = new ProxyFactory(endpoint); - for (Advice advice : adviceChain) { - proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(advice)); + for (Advice interceptor : interceptors) { + proxyFactory.addAdvisor(new EndpointInvokeMethodAdvisor(interceptor)); } bean = proxyFactory.getProxy(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java index e9cce14b44..0088b423c0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/SourceEndpointParser.java @@ -69,10 +69,11 @@ public class SourceEndpointParser extends AbstractSimpleBeanDefinitionParser { throw new ConfigurationException("The sub-element is required for a ."); } builder.addPropertyValue("schedule", this.parseSchedule(scheduleElement)); - Element adviceChainElement = DomUtils.getChildElementByTagName(element, "advice-chain"); - if (adviceChainElement != null) { - ManagedList adviceChain = IntegrationNamespaceUtils.parseEndpointAdviceChain(adviceChainElement); - builder.addPropertyValue("adviceChain", adviceChain); + Element interceptorsElement = DomUtils.getChildElementByTagName(element, "interceptors"); + if (interceptorsElement != null) { + ManagedList interceptors = IntegrationNamespaceUtils.parseEndpointInterceptors( + interceptorsElement, parserContext); + builder.addPropertyValue("interceptors", interceptors); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index eba8db4e30..670f50110f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -166,7 +166,7 @@ - + @@ -416,7 +416,7 @@ - + @@ -439,19 +439,97 @@ - + - Defines a list of Advice. + Defines a list of interceptors. Each element may be an EndpointInterceptor or any Advice instance. - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index d3406a997e..d4f48c7b33 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -40,7 +40,7 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware private volatile String name; - private final List adviceChain = new ArrayList(); + private final List interceptors = new ArrayList(); public String getName() { @@ -59,23 +59,23 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware return (this.name != null) ? this.name : super.toString(); } - public void setAdviceChain(List adviceChain) { - for (Object advice : adviceChain) { - if (advice instanceof Advice) { - this.adviceChain.add((Advice) advice); + public void setInterceptors(List interceptors) { + for (Object interceptor : interceptors) { + if (interceptor instanceof Advice) { + this.interceptors.add((Advice) interceptor); } - else if (advice instanceof EndpointInterceptor) { - this.adviceChain.add(new EndpointMethodInterceptor((EndpointInterceptor) advice)); + else if (interceptor instanceof EndpointInterceptor) { + this.interceptors.add(new EndpointMethodInterceptor((EndpointInterceptor) interceptor)); } else { - throw new ConfigurationException("Each adviceChain element must implement either " + throw new ConfigurationException("Each interceptor element must implement either " + "'" + Advice.class.getName() + "' or '" + EndpointInterceptor.class.getName() + "'."); } } } - public List getAdviceChain() { - return this.adviceChain; + public List getInterceptors() { + return this.interceptors; } public final boolean invoke(Message message) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ConcurrentTarget.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ConcurrentTarget.java index 3bdb9aa66f..90ec55dbf9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ConcurrentTarget.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/ConcurrentTarget.java @@ -22,14 +22,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.DisposableBean; -import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.handler.MessageHandlerRejectedExecutionException; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageDeliveryException; import org.springframework.integration.message.MessageTarget; import org.springframework.integration.util.ErrorHandler; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; /** diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java similarity index 82% rename from org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java rename to org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java index db02397dd0..fcafb5a8f4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptorAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java @@ -14,13 +14,16 @@ * limitations under the License. */ -package org.springframework.integration.endpoint; +package org.springframework.integration.endpoint.interceptor; import org.aopalliance.intercept.MethodInvocation; +import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.message.Message; /** + * A convenience base class for implementing {@link EndpointInterceptor EndpointInterceptors}. + * * @author Mark Fisher */ public class EndpointInterceptorAdapter implements EndpointInterceptor { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java index de21161317..80c0cbaebe 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java @@ -20,7 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.aopalliance.intercept.MethodInvocation; -import org.springframework.integration.endpoint.EndpointInterceptorAdapter; +import org.springframework.integration.endpoint.interceptor.EndpointInterceptorAdapter; /** * @author Mark Fisher