diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java index 30822f11ce..6ecfe6f874 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/MessageBusParser.java @@ -16,15 +16,12 @@ package org.springframework.integration.config; -import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; - import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; @@ -33,6 +30,9 @@ import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.MessageBusAwareBeanPostProcessor; import org.springframework.util.StringUtils; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; /** * Parser for the message-bus element of the integration namespace. @@ -51,10 +51,16 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { private static final String ERROR_CHANNEL_ATTRIBUTE = "error-channel"; private static final String DEFAULT_CONCURRENCY_ELEMENT = "default-concurrency"; - + private static final String DEFAULT_CONCURRENCY_PROPERTY = "defaultConcurrencyPolicy"; private static final String CHANNEL_FACTORY_ATTRIBUTE = "channel-factory"; + + private static final String INTERCEPTOR_ELEMENT = "interceptor"; + + private static final String REFERENCE_ATTRIBUTE = "ref"; + + private static final String INTERCEPTORS_PROPERTY = "interceptors"; @Override @@ -96,6 +102,7 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { private void processChildElements(BeanDefinitionBuilder beanDefinition, Element element) { NodeList childNodes = element.getChildNodes(); + ManagedList interceptors = new ManagedList(); for (int i = 0; i < childNodes.getLength(); i++) { Node child = childNodes.item(i); if (child.getNodeType() == Node.ELEMENT_NODE) { @@ -104,8 +111,14 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { beanDefinition.addPropertyValue(DEFAULT_CONCURRENCY_PROPERTY, IntegrationNamespaceUtils.parseConcurrencyPolicy((Element) child)); } + if (INTERCEPTOR_ELEMENT.equals(localName)) { + interceptors.add(new RuntimeBeanReference(((Element)child).getAttribute(REFERENCE_ATTRIBUTE))); + } } } + if (interceptors.size() > 0) { + beanDefinition.addPropertyValue(INTERCEPTORS_PROPERTY, interceptors); + } } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd index 826ad28c4f..cd396eb888 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/spring-integration-core-1.0.xsd @@ -26,6 +26,7 @@ + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java index 9a50264f9b..4a34ecb885 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java @@ -19,9 +19,8 @@ package org.springframework.integration.bus.interceptor; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.util.concurrent.atomic.AtomicInteger; - import org.junit.Test; + import org.springframework.integration.bus.MessageBus; /** @@ -38,6 +37,12 @@ public class MessageBusInterceptorTests { messageBus.addInterceptor(startInterceptor); messageBus.addInterceptor(stopInterceptor); // check the state of the interceptors + executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor); + } + + public static void executeInterceptorsTest(MessageBus messageBus, TestMessageBusStartInterceptor startInterceptor, + TestMessageBusStopInterceptor stopInterceptor) { + assertTrue(!messageBus.isRunning()); assertEquals(startInterceptor.getPreStartCounter().get(), 0); assertEquals(startInterceptor.getPostStartCounter().get(), 0); assertEquals(stopInterceptor.getPreStopCounter().get(), 0); @@ -57,64 +62,6 @@ public class MessageBusInterceptorTests { assertEquals(stopInterceptor.getPreStopCounter().get(), 1); assertEquals(stopInterceptor.getPostStopCounter().get(), 1); } - - private static class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter { - - private AtomicInteger preStartCounter = new AtomicInteger(0); - - private AtomicInteger postStartCounter = new AtomicInteger(0); - - - public AtomicInteger getPreStartCounter() { - return preStartCounter; - } - - public AtomicInteger getPostStartCounter() { - return postStartCounter; - } - - @Override - public void preStart(MessageBus bus) { - this.preStartCounter.incrementAndGet(); - assertTrue(!bus.isRunning()); - } - - @Override - public void postStart(MessageBus bus) { - this.postStartCounter.incrementAndGet(); - assertTrue(bus.isRunning()); - } - - } - - private static class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter { - - private AtomicInteger preStopCounter = new AtomicInteger(0); - - private AtomicInteger postStopCounter = new AtomicInteger(0); - - - public AtomicInteger getPreStopCounter() { - return preStopCounter; - } - - public AtomicInteger getPostStopCounter() { - return postStopCounter; - } - - @Override - public void preStop(MessageBus bus) { - this.preStopCounter.incrementAndGet(); - assertTrue(bus.isRunning()); - } - - @Override - public void postStop(MessageBus bus) { - this.postStopCounter.incrementAndGet(); - assertTrue(!bus.isRunning()); - } - - } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStartInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStartInterceptor.java new file mode 100644 index 0000000000..c9a609e61c --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStartInterceptor.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus.interceptor; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.integration.bus.MessageBus; + +/** + * @author Marius Bogoevici +*/ +public class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter { + + private AtomicInteger preStartCounter = new AtomicInteger(0); + + private AtomicInteger postStartCounter = new AtomicInteger(0); + + + public AtomicInteger getPreStartCounter() { + return preStartCounter; + } + + public AtomicInteger getPostStartCounter() { + return postStartCounter; + } + + @Override + public void preStart(MessageBus bus) { + this.preStartCounter.incrementAndGet(); + org.junit.Assert.assertTrue(!bus.isRunning()); + } + + @Override + public void postStart(MessageBus bus) { + this.postStartCounter.incrementAndGet(); + org.junit.Assert.assertTrue(bus.isRunning()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStopInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStopInterceptor.java new file mode 100644 index 0000000000..90c715fa68 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/TestMessageBusStopInterceptor.java @@ -0,0 +1,53 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus.interceptor; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.springframework.integration.bus.MessageBus; + +/** + * @author Marius Bogoevici +*/ +public class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter { + + private AtomicInteger preStopCounter = new AtomicInteger(0); + + private AtomicInteger postStopCounter = new AtomicInteger(0); + + + public AtomicInteger getPreStopCounter() { + return preStopCounter; + } + + public AtomicInteger getPostStopCounter() { + return postStopCounter; + } + + @Override + public void preStop(MessageBus bus) { + this.preStopCounter.incrementAndGet(); + org.junit.Assert.assertTrue(bus.isRunning()); + } + + @Override + public void postStop(MessageBus bus) { + this.postStopCounter.incrementAndGet(); + org.junit.Assert.assertTrue(!bus.isRunning()); + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java index 0231fc6cc8..02b3f491a3 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java @@ -33,6 +33,9 @@ import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.ConfigurationException; import org.springframework.integration.bus.MessageBus; import org.springframework.integration.bus.TestMessageBusAwareImpl; +import org.springframework.integration.bus.interceptor.MessageBusInterceptorTests; +import org.springframework.integration.bus.interceptor.TestMessageBusStartInterceptor; +import org.springframework.integration.bus.interceptor.TestMessageBusStopInterceptor; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.dispatcher.DirectChannel; import org.springframework.integration.endpoint.TargetEndpoint; @@ -64,7 +67,7 @@ public class MessageBusParserTests { assertNotNull("bus should have created a default error channel", bus.getErrorChannel()); } - @Test(expected=ConfigurationException.class) + @Test(expected = ConfigurationException.class) public void testAutoCreateChannelsDisabledByDefault() { ApplicationContext context = new ClassPathXmlApplicationContext( "messageBusWithDefaults.xml", this.getClass()); @@ -110,7 +113,7 @@ public class MessageBusParserTests { // tries to get a reference to the message bus assertEquals(BeanCreationException.class, e.getCause().getClass()); assertEquals(e.getBeanName(), MessageBusParser.MESSAGE_BUS_AWARE_POST_PROCESSOR_BEAN_NAME); - assertEquals(ConfigurationException.class, ((BeanCreationException) e.getCause()).getCause().getClass()); + assertEquals(ConfigurationException.class, (e.getCause()).getCause().getClass()); assertEquals(((BeanCreationException) e.getCause()).getBeanName(), MessageBusParser.MESSAGE_BUS_BEAN_NAME); } assertTrue(exceptionThrown); @@ -140,9 +143,9 @@ public class MessageBusParserTests { "messageBusWithDefaultConcurrencyTests.xml", this.getClass()); TargetEndpoint endpoint2 = (TargetEndpoint) context.getBean("endpoint2"); assertEquals(14, endpoint2.getConcurrencyPolicy().getCoreSize()); - assertEquals(17, endpoint2.getConcurrencyPolicy().getMaxSize()); + assertEquals(17, endpoint2.getConcurrencyPolicy().getMaxSize()); } - + @Test public void testMessageBusAwareAutomaticallyAddedByNamespace() { ApplicationContext context = new ClassPathXmlApplicationContext( @@ -153,7 +156,7 @@ public class MessageBusParserTests { @Test public void testMessageBusWithChannelFactory() { - ApplicationContext context = new ClassPathXmlApplicationContext("messageBusWithChannelFactory.xml", + ApplicationContext context = new ClassPathXmlApplicationContext("messageBusWithChannelFactory.xml", this.getClass()); assertEquals(DirectChannel.class, context.getBean("defaultTypeChannel").getClass()); assertEquals(QueueChannel.class, context.getBean("specifiedTypeChannel").getClass()); @@ -194,4 +197,16 @@ public class MessageBusParserTests { assertEquals(SimpleMessagingTaskScheduler.class, taskExecutor.getClass()); } + @Test + public void testMessageBusWithInterceptors() { + ApplicationContext context = new ClassPathXmlApplicationContext( + "messageBusWithInterceptors.xml", this.getClass()); + MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); + TestMessageBusStartInterceptor startInterceptor = (TestMessageBusStartInterceptor) context.getBean( + "startInterceptor"); + TestMessageBusStopInterceptor stopInterceptor = (TestMessageBusStopInterceptor) context.getBean( + "stopInterceptor"); + MessageBusInterceptorTests.executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor); + } + } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml new file mode 100644 index 0000000000..e42424b265 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml @@ -0,0 +1,19 @@ + + + + + + + + + + + + +