diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ChannelAdapterAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ChannelAdapterAnnotationPostProcessor.java index 370c06ba9f..788f5e0df1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ChannelAdapterAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/ChannelAdapterAnnotationPostProcessor.java @@ -17,6 +17,10 @@ package org.springframework.integration.config.annotation; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.aopalliance.aop.Advice; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.core.annotation.AnnotationUtils; @@ -36,8 +40,15 @@ import org.springframework.integration.message.MethodInvokingConsumer; import org.springframework.integration.message.MethodInvokingSource; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.interceptor.NoRollbackRuleAttribute; +import org.springframework.transaction.interceptor.RollbackRuleAttribute; +import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; +import org.springframework.transaction.interceptor.TransactionAttribute; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; +import org.springframework.util.StringUtils; /** * Post-processor for methods annotated with {@link ChannelAdapter @ChannelAdapter}. @@ -102,6 +113,27 @@ public class ChannelAdapterAnnotationPostProcessor implements MethodAnnotationPo adapter.setSource(source); adapter.setOutputChannel(channel); adapter.setTrigger(trigger); + if (StringUtils.hasText(pollerAnnotation.transactionManager())) { + String txManagerRef = pollerAnnotation.transactionManager(); + Assert.isTrue(this.beanFactory.containsBean(txManagerRef), + "failed to resolve transactionManager reference, no such bean '" + txManagerRef + "'"); + PlatformTransactionManager txManager = (PlatformTransactionManager) + this.beanFactory.getBean(txManagerRef, PlatformTransactionManager.class); + adapter.setTransactionManager(txManager); + Transactional txAnnotation = pollerAnnotation.transactionAttributes(); + adapter.setTransactionDefinition(this.parseTransactionAnnotation(txAnnotation)); + } + String[] adviceChainArray = pollerAnnotation.adviceChain(); + if (adviceChainArray.length > 0) { + List adviceChain = new ArrayList(); + for (String adviceChainString : adviceChainArray) { + String[] adviceRefs = StringUtils.tokenizeToStringArray(adviceChainString, ","); + for (String adviceRef : adviceRefs) { + adviceChain.add((Advice) this.beanFactory.getBean(adviceRef, Advice.class)); + } + } + adapter.setAdviceChain(adviceChain); + } return adapter; } @@ -132,4 +164,39 @@ public class ChannelAdapterAnnotationPostProcessor implements MethodAnnotationPo return !method.getReturnType().equals(void.class); } + @SuppressWarnings("unchecked") + private TransactionAttribute parseTransactionAnnotation(Transactional annotation) { + if (annotation == null) { + return null; + } + RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); + rbta.setPropagationBehavior(annotation.propagation().value()); + rbta.setIsolationLevel(annotation.isolation().value()); + rbta.setTimeout(annotation.timeout()); + rbta.setReadOnly(annotation.readOnly()); + ArrayList rollBackRules = new ArrayList(); + Class[] rbf = annotation.rollbackFor(); + for (int i = 0; i < rbf.length; ++i) { + RollbackRuleAttribute rule = new RollbackRuleAttribute(rbf[i]); + rollBackRules.add(rule); + } + String[] rbfc = annotation.rollbackForClassName(); + for (int i = 0; i < rbfc.length; ++i) { + RollbackRuleAttribute rule = new RollbackRuleAttribute(rbfc[i]); + rollBackRules.add(rule); + } + Class[] nrbf = annotation.noRollbackFor(); + for (int i = 0; i < nrbf.length; ++i) { + NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(nrbf[i]); + rollBackRules.add(rule); + } + String[] nrbfc = annotation.noRollbackForClassName(); + for (int i = 0; i < nrbfc.length; ++i) { + NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(nrbfc[i]); + rollBackRules.add(rule); + } + rbta.getRollbackRules().addAll(rollBackRules); + return rbta; + } + } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTestBean.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTestBean.java new file mode 100644 index 0000000000..f86e67bb3e --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTestBean.java @@ -0,0 +1,38 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import java.util.concurrent.TimeUnit; + +import org.springframework.integration.annotation.ChannelAdapter; +import org.springframework.integration.annotation.MessageEndpoint; +import org.springframework.integration.annotation.Poller; + +/** + * @author Mark Fisher + */ +@MessageEndpoint +public class PollerAnnotationChannelAdapterAdviceChainTestBean { + + @ChannelAdapter("output") + @Poller(interval=5, timeUnit=TimeUnit.SECONDS, maxMessagesPerPoll=1, + adviceChain="aroundAdvice,beforeAdvice,afterAdvice") + public String testMethod() { + return "test"; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests-context.xml new file mode 100644 index 0000000000..929485164b --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests-context.xml @@ -0,0 +1,37 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests.java new file mode 100644 index 0000000000..513275feb5 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterAdviceChainTests.java @@ -0,0 +1,69 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.message.Message; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +/** + * @author Mark Fisher + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +public class PollerAnnotationChannelAdapterAdviceChainTests { + + @Autowired @Qualifier("output") + private PollableChannel output; + + @Autowired + private CountDownLatch latch; + + @Autowired + private TestBeforeAdvice beforeAdvice; + + @Autowired + private TestAfterAdvice afterAdvice; + + @Autowired + private TestAroundAdvice aroundAdvice; + + + @Test + public void testAdviceChain() throws InterruptedException { + Message reply = output.receive(1000); + assertEquals("test", reply.getPayload()); + latch.await(1, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + assertEquals(4, aroundAdvice.getPreCount()); + assertEquals(3, beforeAdvice.getLatchCount()); + assertEquals(2, afterAdvice.getLatchCount()); + assertEquals(1, aroundAdvice.getPostCount()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTestBean.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTestBean.java new file mode 100644 index 0000000000..05eff4dd19 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTestBean.java @@ -0,0 +1,68 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import org.springframework.integration.annotation.ChannelAdapter; +import org.springframework.integration.annotation.MessageEndpoint; +import org.springframework.integration.annotation.Poller; +import org.springframework.transaction.annotation.Propagation; +import org.springframework.transaction.annotation.Transactional; + +/** + * @author Mark Fisher + */ +@MessageEndpoint +public class PollerAnnotationChannelAdapterTransactionalTestBean { + + private volatile boolean shouldFail; + + private BlockingQueue queue = new ArrayBlockingQueue(1); + + + public void setShouldFail(boolean shouldFail) { + this.shouldFail = shouldFail; + } + + public void setNextValue(String nextValue) { + try { + this.queue.put(nextValue); + } + catch (InterruptedException e) { + } + } + + @ChannelAdapter("output") + @Poller(interval=100, maxMessagesPerPoll=1, + transactionManager="testTransactionManager", + transactionAttributes=@Transactional(propagation=Propagation.REQUIRES_NEW)) + public String testMethod() { + String result = null; + try { + result = this.queue.take(); + } + catch (InterruptedException e) { + } + if (this.shouldFail) { + throw new IllegalArgumentException("intentional test failure"); + } + return result; + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests-context.xml new file mode 100644 index 0000000000..a466b5216f --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests-context.xml @@ -0,0 +1,23 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests.java new file mode 100644 index 0000000000..6b07bc16a2 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationChannelAdapterTransactionalTests.java @@ -0,0 +1,87 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.config; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.integration.channel.PollableChannel; +import org.springframework.integration.message.Message; +import org.springframework.integration.util.TestTransactionManager; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.transaction.TransactionDefinition; + +/** + * @author Mark Fisher + */ +@ContextConfiguration +@RunWith(SpringJUnit4ClassRunner.class) +public class PollerAnnotationChannelAdapterTransactionalTests { + + @Autowired @Qualifier("output") + private PollableChannel output; + + @Autowired + private PollerAnnotationChannelAdapterTransactionalTestBean adapter; + + @Autowired + private TestTransactionManager transactionManager; + + + @Before + public void resetTransactionManager() { + transactionManager.reset(); + } + + + @Test + public void commit() throws InterruptedException { + adapter.setShouldFail(false); + adapter.setNextValue("commit-test"); + transactionManager.waitForCompletion(1000); + Message reply = output.receive(1000); + assertEquals("commit-test", reply.getPayload()); + assertEquals(1, transactionManager.getCommitCount()); + assertEquals(0, transactionManager.getRollbackCount()); + } + + @Test + public void rollback() throws InterruptedException { + adapter.setShouldFail(true); + adapter.setNextValue("rollback-test"); + transactionManager.waitForCompletion(1000); + assertNull(output.receive(0)); + assertEquals(0, transactionManager.getCommitCount()); + assertEquals(1, transactionManager.getRollbackCount()); + } + + @Test + public void verifyPropagationSetting() throws InterruptedException { + adapter.setNextValue("propagation-test"); + transactionManager.waitForCompletion(1000); + assertEquals(TransactionDefinition.PROPAGATION_REQUIRES_NEW, + transactionManager.getLastDefinition().getPropagationBehavior()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationAdviceChainTestBean.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTestBean.java similarity index 95% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationAdviceChainTestBean.java rename to org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTestBean.java index 1df4db180d..70ce3c9c8b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationAdviceChainTestBean.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTestBean.java @@ -26,7 +26,7 @@ import org.springframework.integration.annotation.ServiceActivator; * @author Mark Fisher */ @MessageEndpoint -public class PollerAnnotationAdviceChainTestBean { +public class PollerAnnotationConsumerAdviceChainTestBean { @ServiceActivator(inputChannel="input", outputChannel="output") @Poller(interval=5, timeUnit=TimeUnit.SECONDS, maxMessagesPerPoll=1, diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTests-context.xml index 9c57ceed9b..88d88976ee 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerAdviceChainTests-context.xml @@ -20,7 +20,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationTransactionalTestBean.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTestBean.java similarity index 95% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationTransactionalTestBean.java rename to org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTestBean.java index 2a01794c85..6169771efd 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationTransactionalTestBean.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTestBean.java @@ -26,7 +26,7 @@ import org.springframework.transaction.annotation.Transactional; * @author Mark Fisher */ @MessageEndpoint -public class PollerAnnotationTransactionalTestBean { +public class PollerAnnotationConsumerTransactionalTestBean { @ServiceActivator(inputChannel="input", outputChannel="output") @Poller(interval=100, maxMessagesPerPoll=1, diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTests-context.xml index c9dc006908..1ee5eadf23 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/PollerAnnotationConsumerTransactionalTests-context.xml @@ -20,7 +20,7 @@ - +