diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java index 2311280cf2..22f01014c6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java @@ -25,7 +25,6 @@ import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; -import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; @@ -125,7 +124,6 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { */ private void addPostProcessors(ParserContext parserContext) { this.registerMessageBusAwarePostProcessor(parserContext); - this.registerMessageEndpointPostProcessor(parserContext); } private void registerMessageBusAwarePostProcessor(ParserContext parserContext) { @@ -135,10 +133,4 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { parserContext.getRegistry().registerBeanDefinition(MESSAGE_BUS_AWARE_POST_PROCESSOR_BEAN_NAME, builder.getBeanDefinition()); } - private void registerMessageEndpointPostProcessor(ParserContext parserContext) { - BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(MessageEndpointBeanPostProcessor.class); - builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); - BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java deleted file mode 100644 index 7b8761b689..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageEndpointBeanPostProcessor.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import java.lang.reflect.Method; -import java.util.List; - -import org.aopalliance.aop.Advice; - -import org.springframework.aop.framework.ProxyFactory; -import org.springframework.aop.support.StaticMethodMatcherPointcutAdvisor; -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.config.BeanPostProcessor; -import org.springframework.integration.endpoint.AbstractEndpoint; -import org.springframework.integration.message.Message; - -/** - * A post-processor that applies interceptors by creating a proxy for an endpoint. - * - * @author Mark Fisher - */ -public class MessageEndpointBeanPostProcessor implements BeanPostProcessor { - - public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { - return bean; - } - - public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof AbstractEndpoint) { - AbstractEndpoint endpoint = (AbstractEndpoint) bean; - List interceptors = endpoint.getInterceptors(); - if (interceptors.size() > 0) { - ProxyFactory proxyFactory = new ProxyFactory(endpoint); - for (Advice interceptor : interceptors) { - proxyFactory.addAdvisor(new EndpointMethodAdvisor(interceptor)); - } - bean = proxyFactory.getProxy(); - } - } - return bean; - } - - - @SuppressWarnings("serial") - private static class EndpointMethodAdvisor extends StaticMethodMatcherPointcutAdvisor { - - EndpointMethodAdvisor(Advice advice) { - super(advice); - } - - - @SuppressWarnings("unchecked") - public boolean matches(Method method, Class clazz) { - return (method.getName().equals("invoke") || method.getName().equals("send")) - && method.getParameterTypes().length == 1 - && method.getParameterTypes()[0].equals(Message.class); - } - - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransactionInterceptorParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransactionInterceptorParser.java index 78f2269b42..dbb41e7e24 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/TransactionInterceptorParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/TransactionInterceptorParser.java @@ -16,20 +16,13 @@ package org.springframework.integration.config; -import java.util.LinkedList; -import java.util.List; - import org.w3c.dom.Element; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; -import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource; -import org.springframework.transaction.interceptor.NoRollbackRuleAttribute; -import org.springframework.transaction.interceptor.RollbackRuleAttribute; +import org.springframework.integration.endpoint.interceptor.TransactionInterceptor; import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; -import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.StringUtils; /** @@ -46,51 +39,28 @@ public class TransactionInterceptorParser implements BeanDefinitionRegisteringPa if (!StringUtils.hasText(txManagerRef)) { txManagerRef = "transactionManager"; } - builder.addPropertyReference("transactionManager", txManagerRef); - RuleBasedTransactionAttribute attribute = new RuleBasedTransactionAttribute(); + builder.addConstructorArgReference(txManagerRef); String propagation = element.getAttribute("propagation"); String isolation = element.getAttribute("isolation"); String timeout = element.getAttribute("timeout"); String readOnly = element.getAttribute("read-only"); if (StringUtils.hasText(propagation)) { - attribute.setPropagationBehaviorName(RuleBasedTransactionAttribute.PREFIX_PROPAGATION + propagation); + builder.addPropertyValue("propagationBehaviorName", RuleBasedTransactionAttribute.PREFIX_PROPAGATION + propagation); } if (StringUtils.hasText(isolation)) { - attribute.setIsolationLevelName(RuleBasedTransactionAttribute.PREFIX_ISOLATION + isolation); + builder.addPropertyValue("isolationLevelName", RuleBasedTransactionAttribute.PREFIX_ISOLATION + isolation); } if (StringUtils.hasText(timeout)) { try { - attribute.setTimeout(Integer.parseInt(timeout)); + builder.addPropertyValue("timeout", Integer.parseInt(timeout)); } catch (NumberFormatException ex) { parserContext.getReaderContext().error("Timeout must be an integer value: [" + timeout + "]", element); } } if (StringUtils.hasText(readOnly)) { - attribute.setReadOnly(Boolean.valueOf(readOnly).booleanValue()); + builder.addPropertyValue("readOnly", Boolean.valueOf(readOnly)); } - List rollbackRules = new LinkedList(); - if (element.hasAttribute("rollback-for")) { - String rollbackForValue = element.getAttribute("rollback-for"); - String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(rollbackForValue); - for (int i = 0; i < exceptionTypeNames.length; i++) { - rollbackRules.add(new RollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i]))); - } - } - if (element.hasAttribute("no-rollback-for")) { - String noRollbackForValue = element.getAttribute("no-rollback-for"); - String[] exceptionTypeNames = StringUtils.commaDelimitedListToStringArray(noRollbackForValue); - for (int i = 0; i < exceptionTypeNames.length; i++) { - rollbackRules.add(new NoRollbackRuleAttribute(StringUtils.trimWhitespace(exceptionTypeNames[i]))); - } - } - attribute.setRollbackRules(rollbackRules); - RootBeanDefinition attributeSourceDefinition = new RootBeanDefinition(MatchAlwaysTransactionAttributeSource.class); - attributeSourceDefinition.setSource(parserContext.extractSource(element)); - attributeSourceDefinition.getPropertyValues().addPropertyValue("transactionAttribute", attribute); - String attributeSourceBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( - attributeSourceDefinition, parserContext.getRegistry()); - builder.addPropertyReference("transactionAttributeSource", attributeSourceBeanName); return BeanDefinitionReaderUtils.registerWithGeneratedName(builder.getBeanDefinition(), parserContext.getRegistry()); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index be08382c36..da7c27a7a6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -476,22 +476,6 @@ ]]> - - - - - - - - - - \ No newline at end of file diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index bc24696bbc..b196d2df61 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -19,19 +19,17 @@ package org.springframework.integration.endpoint; import java.util.ArrayList; import java.util.List; -import org.aopalliance.aop.Advice; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanNameAware; import org.springframework.context.Lifecycle; -import org.springframework.integration.ConfigurationException; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.message.Message; import org.springframework.integration.message.MessageRejectedException; +import org.springframework.integration.message.MessageTarget; import org.springframework.integration.scheduling.Schedule; /** @@ -53,7 +51,7 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware private MessageChannel outputChannel; - private final List interceptors = new ArrayList(); + private final List interceptors = new ArrayList(); private volatile Schedule schedule; @@ -157,27 +155,18 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware this.autoStartup = autoStartup; } - public void addInterceptor(Object interceptor) { - if (interceptor instanceof Advice) { - this.interceptors.add((Advice) interceptor); - } - else if (interceptor instanceof EndpointInterceptor) { - this.interceptors.add(new EndpointMethodInterceptor((EndpointInterceptor) interceptor)); - } - else { - throw new ConfigurationException("Interceptor must implement either " - + "'" + Advice.class.getName() + "' or '" + EndpointInterceptor.class.getName() + "'."); - } + public void addInterceptor(EndpointInterceptor interceptor) { + this.interceptors.add(interceptor); } - public void setInterceptors(List interceptors) { + public void setInterceptors(List interceptors) { this.interceptors.clear(); - for (Object interceptor : interceptors) { + for (EndpointInterceptor interceptor : interceptors) { this.addInterceptor(interceptor); } } - public List getInterceptors() { + public List getInterceptors() { return this.interceptors; } @@ -221,6 +210,37 @@ public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware } public final boolean send(Message message) { + return this.send(message, 0); + } + + private boolean send(final Message message, final int index) { + boolean result = false; + if (index == 0) { + for (EndpointInterceptor interceptor : interceptors) { + if (!interceptor.preSend(message)) { + return false; + } + } + } + if (index == interceptors.size()) { + return this.doSend(message); + } + EndpointInterceptor nextInterceptor = interceptors.get(index); + result = nextInterceptor.aroundSend(message, new MessageTarget() { + @SuppressWarnings("unchecked") + public boolean send(Message message) { + return AbstractEndpoint.this.send(message, index + 1); + } + }); + if (index == this.interceptors.size()) { + for (EndpointInterceptor interceptor : this.interceptors) { + interceptor.postSend(message, result); + } + } + return result; + } + + private boolean doSend(Message message) { if (message == null || message.getPayload() == null) { throw new IllegalArgumentException("Message and its payload must not be null."); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java index 3a0283a705..2bcca6a5e4 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointInterceptor.java @@ -16,19 +16,18 @@ package org.springframework.integration.endpoint; -import org.aopalliance.intercept.MethodInvocation; - import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; /** * @author Mark Fisher */ public interface EndpointInterceptor { - boolean preInvoke(Message message); + boolean preSend(Message message); - boolean aroundInvoke(MethodInvocation invocation) throws Throwable; + boolean aroundSend(Message message, MessageTarget endpoint); - void postInvoke(Message message, boolean result); + void postSend(Message message, boolean result); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java deleted file mode 100644 index bcdba10c74..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EndpointMethodInterceptor.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.endpoint; - -import org.aopalliance.intercept.MethodInterceptor; -import org.aopalliance.intercept.MethodInvocation; - -import org.springframework.integration.message.Message; -import org.springframework.util.Assert; - -/** - * @author Mark Fisher - */ -public class EndpointMethodInterceptor implements MethodInterceptor { - - private final EndpointInterceptor interceptor; - - - public EndpointMethodInterceptor(EndpointInterceptor interceptor) { - Assert.notNull(interceptor, "EndpointInterceptor must not be null."); - this.interceptor = interceptor; - } - - - public Object invoke(MethodInvocation invocation) throws Throwable { - Message message = null; - try { - message = (Message) invocation.getArguments()[0]; - } - catch (Exception e) { - throw new IllegalStateException("EndpointMethodInterceptor is only applicable for the " - + "'MessageEndpoint.invoke(Message message)' method."); - } - if (!this.interceptor.preInvoke(message)) { - return Boolean.FALSE; - } - boolean returnValue = this.interceptor.aroundInvoke(invocation); - this.interceptor.postInvoke(message, returnValue); - return returnValue; - } - -} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/ConcurrencyInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/ConcurrencyInterceptor.java index a536ec8b63..496ccc307f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/ConcurrencyInterceptor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/ConcurrencyInterceptor.java @@ -23,7 +23,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; -import org.aopalliance.intercept.MethodInvocation; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -39,6 +38,7 @@ import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.handler.MessageHandlerNotRunningException; import org.springframework.integration.handler.MessageHandlerRejectedExecutionException; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; import org.springframework.integration.scheduling.MessagePublishingErrorHandler; import org.springframework.integration.util.ErrorHandler; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; @@ -108,16 +108,15 @@ public class ConcurrencyInterceptor extends EndpointInterceptorAdapter } @Override - public boolean aroundInvoke(final MethodInvocation invocation) { - final Message message = (Message) invocation.getArguments()[0]; - if (invocation.getThis() instanceof Lifecycle && !((Lifecycle) invocation.getThis()).isRunning()) { + public boolean aroundSend(final Message message, final MessageTarget endpoint) { + if (endpoint instanceof Lifecycle && !((Lifecycle) endpoint).isRunning()) { throw new MessageHandlerNotRunningException(message); } try { this.executor.execute(new Runnable() { public void run() { try { - invocation.proceed(); + endpoint.send(message); } catch (Throwable t) { if (logger.isDebugEnabled()) { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java index fcafb5a8f4..a2987f1f00 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/EndpointInterceptorAdapter.java @@ -16,10 +16,9 @@ package org.springframework.integration.endpoint.interceptor; -import org.aopalliance.intercept.MethodInvocation; - import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; /** * A convenience base class for implementing {@link EndpointInterceptor EndpointInterceptors}. @@ -28,15 +27,15 @@ import org.springframework.integration.message.Message; */ public class EndpointInterceptorAdapter implements EndpointInterceptor { - public boolean preInvoke(Message message) { + public boolean preSend(Message message) { return true; } - public boolean aroundInvoke(MethodInvocation invocation) throws Throwable { - return (Boolean) invocation.proceed(); + public boolean aroundSend(Message message, MessageTarget endpoint) { + return endpoint.send(message); } - public void postInvoke(Message message, boolean result) { + public void postSend(Message message, boolean result) { } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptor.java new file mode 100644 index 0000000000..0467b3595c --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/interceptor/TransactionInterceptor.java @@ -0,0 +1,102 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint.interceptor; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.integration.ConfigurationException; +import org.springframework.integration.endpoint.EndpointInterceptor; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * An {@link EndpointInterceptor} implementation that provides transactional + * behavior with a {@link PlatformTransactionManager}. + * + * @author Mark Fisher + */ +public class TransactionInterceptor extends EndpointInterceptorAdapter implements InitializingBean { + + private final Log logger = LogFactory.getLog(this.getClass()); + + private final PlatformTransactionManager transactionManager; + + private volatile TransactionTemplate transactionTemplate; + + private volatile String propagationBehaviorName; + + private volatile String isolationLevelName; + + private volatile int timeout; + + private volatile boolean readOnly; + + + public TransactionInterceptor(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + + public void setPropagationBehaviorName(String propagationBehaviorName) { + this.propagationBehaviorName = propagationBehaviorName; + } + + public void setIsolationLevelName(String isolationLevelName) { + this.isolationLevelName = isolationLevelName; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + } + + public void setReadOnly(boolean readOnly) { + this.readOnly = readOnly; + } + + public void afterPropertiesSet() { + TransactionTemplate template = new TransactionTemplate(this.transactionManager); + template.setPropagationBehaviorName(this.propagationBehaviorName); + template.setIsolationLevelName(this.isolationLevelName); + template.setTimeout(this.timeout); + template.setReadOnly(this.readOnly); + this.transactionTemplate = template; + } + + @Override + public boolean aroundSend(final Message message, final MessageTarget endpoint) { + if (this.transactionTemplate == null) { + throw new ConfigurationException("TransactionInterceptor has not been initialized"); + } + this.transactionTemplate.execute(new TransactionCallback() { + public Object doInTransaction(TransactionStatus status) { + if (logger.isDebugEnabled()) { + logger.debug("Executing endpoint '" + endpoint + "' within transaction [" + status + "]"); + } + endpoint.send(message); + return null; + } + }); + return true; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java new file mode 100644 index 0000000000..6369014852 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/EndpointInterceptorTests.java @@ -0,0 +1,112 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.integration.channel.MessageChannel; +import org.springframework.integration.endpoint.AbstractEndpoint; +import org.springframework.integration.endpoint.EndpointPoller; +import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.SourceEndpoint; +import org.springframework.integration.message.GenericMessage; +import org.springframework.integration.message.StringMessage; + +/** + * @author Mark Fisher + */ +public class EndpointInterceptorTests { + + @Test + public void testHandlerEndpointWithBeanInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithBeanInterceptors"); + testInterceptors(endpoint, context, true); + } + + @Test + public void testHandlerEndpointWithRefInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithRefInterceptors"); + testInterceptors(endpoint, context, false); + } + + @Test + public void testTargetEndpointWithBeanInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithBeanInterceptors"); + testInterceptors(endpoint, context, true); + } + + @Test + public void testTargetEndpointWithRefInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithRefInterceptors"); + testInterceptors(endpoint, context, false); + } + + @Test + public void testSourceEndpointWithBeanInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithBeanInterceptors"); + testInterceptors(endpoint, context, true); + } + + @Test + public void testSourceEndpointWithRefInterceptors() { + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "endpointInterceptorTests.xml", this.getClass()); + MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithRefInterceptors"); + testInterceptors(endpoint, context, false); + } + + + private static void testInterceptors(MessageEndpoint endpoint, ClassPathXmlApplicationContext context, boolean innerBeans) { + TestPreSendInterceptor preInterceptor = null; + TestAroundSendEndpointInterceptor aroundInterceptor = null; + if (innerBeans) { + preInterceptor = (TestPreSendInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(0); + aroundInterceptor = (TestAroundSendEndpointInterceptor) ((AbstractEndpoint) endpoint).getInterceptors().get(1); + } + else { + preInterceptor = (TestPreSendInterceptor) context.getBean("preInterceptor"); + aroundInterceptor = (TestAroundSendEndpointInterceptor) context.getBean("aroundInterceptor"); + } + assertEquals(0, preInterceptor.getCount()); + assertEquals(0, aroundInterceptor.getCount()); + if (endpoint instanceof SourceEndpoint) { + MessageChannel channel = (MessageChannel) context.getBean("testChannel"); + channel.send(new StringMessage("foo")); + endpoint.send(new GenericMessage(new EndpointPoller())); + } + else { + endpoint.send(new StringMessage("test")); + } + assertEquals(1, preInterceptor.getCount()); + assertEquals(2, aroundInterceptor.getCount()); + context.stop(); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java deleted file mode 100644 index b1312c3859..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageEndpointBeanPostProcessorTests.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.config; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import org.springframework.aop.support.AopUtils; -import org.springframework.context.ApplicationContext; -import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.endpoint.EndpointPoller; -import org.springframework.integration.endpoint.MessageEndpoint; -import org.springframework.integration.message.GenericMessage; -import org.springframework.integration.message.StringMessage; - -/** - * @author Mark Fisher - */ -public class MessageEndpointBeanPostProcessorTests { - - @Test - public void testNoProxyCreatedForHandlerEndpointWithEmptyAdviceChain() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithoutAdvice"); - assertFalse(AopUtils.isAopProxy(endpoint)); - } - - @Test - public void testHandlerEndpointWithAdviceChain() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("handlerEndpointWithAdvice"); - assertTrue(AopUtils.isAopProxy(endpoint)); - TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); - TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); - assertEquals(0, beforeAdvice.getCount()); - assertEquals(0, interceptor.getCount()); - endpoint.send(new StringMessage("test")); - assertEquals(1, beforeAdvice.getCount()); - assertEquals(2, interceptor.getCount()); - context.stop(); - } - - @Test - public void testNoProxyCreatedForTargetEndpointWithEmptyAdviceChain() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithoutAdvice"); - assertFalse(AopUtils.isAopProxy(endpoint)); - } - - @Test - public void testTargetEndpointWithAdviceChain() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("targetEndpointWithAdvice"); - assertTrue(AopUtils.isAopProxy(endpoint)); - TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); - TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); - assertEquals(0, beforeAdvice.getCount()); - assertEquals(0, interceptor.getCount()); - endpoint.send(new StringMessage("test")); - assertEquals(1, beforeAdvice.getCount()); - assertEquals(2, interceptor.getCount()); - context.stop(); - } - - @Test - public void testNoProxyCreatedForSourceEndpointWithEmptyAdviceChain() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithoutAdvice"); - assertFalse(AopUtils.isAopProxy(endpoint)); - } - - @Test - public void testSourceEndpointWithAdviceChain() { - ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "messageEndpointBeanPostProcessorTests.xml", this.getClass()); - MessageEndpoint endpoint = (MessageEndpoint) context.getBean("sourceEndpointWithAdvice"); - assertTrue(AopUtils.isAopProxy(endpoint)); - TestBeforeAdvice beforeAdvice = (TestBeforeAdvice) context.getBean("simpleAdvice"); - TestEndpointInterceptor interceptor = (TestEndpointInterceptor) context.getBean("interceptor"); - assertEquals(0, beforeAdvice.getCount()); - assertEquals(0, interceptor.getCount()); - endpoint.send(new GenericMessage(new EndpointPoller())); - assertEquals(1, beforeAdvice.getCount()); - assertEquals(2, interceptor.getCount()); - context.stop(); - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAroundSendEndpointInterceptor.java similarity index 66% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java rename to org.springframework.integration/src/test/java/org/springframework/integration/config/TestAroundSendEndpointInterceptor.java index 3726b4afbd..1022bdd7eb 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestBeforeAdvice.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestAroundSendEndpointInterceptor.java @@ -16,15 +16,16 @@ package org.springframework.integration.config; -import java.lang.reflect.Method; import java.util.concurrent.atomic.AtomicInteger; -import org.springframework.aop.MethodBeforeAdvice; +import org.springframework.integration.endpoint.interceptor.EndpointInterceptorAdapter; +import org.springframework.integration.message.Message; +import org.springframework.integration.message.MessageTarget; /** * @author Mark Fisher */ -public class TestBeforeAdvice implements MethodBeforeAdvice { +public class TestAroundSendEndpointInterceptor extends EndpointInterceptorAdapter { private AtomicInteger counter = new AtomicInteger(); @@ -33,8 +34,12 @@ public class TestBeforeAdvice implements MethodBeforeAdvice { return this.counter.get(); } - public void before(Method method, Object[] args, Object target) throws Throwable { + @Override + public boolean aroundSend(Message message, MessageTarget endpoint) { this.counter.incrementAndGet(); + boolean result = endpoint.send(message); + this.counter.incrementAndGet(); + return result; } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestPreSendInterceptor.java similarity index 76% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java rename to org.springframework.integration/src/test/java/org/springframework/integration/config/TestPreSendInterceptor.java index 80c0cbaebe..839d3eb78f 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/TestEndpointInterceptor.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/TestPreSendInterceptor.java @@ -18,14 +18,13 @@ package org.springframework.integration.config; import java.util.concurrent.atomic.AtomicInteger; -import org.aopalliance.intercept.MethodInvocation; - import org.springframework.integration.endpoint.interceptor.EndpointInterceptorAdapter; +import org.springframework.integration.message.Message; /** * @author Mark Fisher */ -public class TestEndpointInterceptor extends EndpointInterceptorAdapter { +public class TestPreSendInterceptor extends EndpointInterceptorAdapter { private AtomicInteger counter = new AtomicInteger(); @@ -34,11 +33,10 @@ public class TestEndpointInterceptor extends EndpointInterceptorAdapter { return this.counter.get(); } - public boolean aroundInvoke(MethodInvocation invocation) throws Throwable { + @Override + public boolean preSend(Message message) { this.counter.incrementAndGet(); - Boolean result = (Boolean) invocation.proceed(); - this.counter.incrementAndGet(); - return result; + return true; } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index 041e6e073b..b0a01e9e90 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -29,7 +29,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.aopalliance.aop.Advice; import org.junit.Test; import org.springframework.aop.framework.ProxyFactory; @@ -53,8 +52,8 @@ import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.MessageChannel; import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.endpoint.EndpointInterceptor; import org.springframework.integration.endpoint.HandlerEndpoint; -import org.springframework.integration.endpoint.interceptor.ConcurrencyInterceptor; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Message; import org.springframework.integration.message.StringMessage; @@ -184,11 +183,9 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.postProcessAfterInitialization(testBean, "testBean"); HandlerEndpoint endpoint = (HandlerEndpoint) messageBus.lookupEndpoint("testBean.MessageHandler.endpoint"); assertEquals(1, endpoint.getInterceptors().size()); - Advice interceptor = endpoint.getInterceptors().get(0); + EndpointInterceptor interceptor = endpoint.getInterceptors().get(0); DirectFieldAccessor accessor = new DirectFieldAccessor(interceptor); - ConcurrencyInterceptor concurrencyInterceptor = (ConcurrencyInterceptor) - accessor.getPropertyValue("interceptor"); - accessor = new DirectFieldAccessor(concurrencyInterceptor); + accessor = new DirectFieldAccessor(interceptor); ConcurrentTaskExecutor cte = (ConcurrentTaskExecutor) accessor.getPropertyValue("executor"); ThreadPoolExecutor executor = (ThreadPoolExecutor) cte.getConcurrentExecutor(); assertEquals(17, executor.getCorePoolSize()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml new file mode 100644 index 0000000000..00cd10b5d6 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/endpointInterceptorTests.xml @@ -0,0 +1,88 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml deleted file mode 100644 index cd3ce0699e..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageEndpointBeanPostProcessorTests.xml +++ /dev/null @@ -1,76 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -