diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java index 8f109fd3ec..ab5d41d0d0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java @@ -17,9 +17,7 @@ package org.springframework.integration.bus; import java.util.Collection; -import java.util.List; import java.util.Set; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import org.apache.commons.logging.Log; @@ -63,16 +61,12 @@ public class ApplicationContextMessageBus implements MessageBus, ApplicationCont private final Set endpoints = new CopyOnWriteArraySet(); - private final MessageBusInterceptorsList interceptors = new MessageBusInterceptorsList(); - private volatile TaskScheduler taskScheduler; private volatile ApplicationContext applicationContext; private volatile boolean autoStartup = true; - private volatile boolean initialized; - private volatile boolean running; private final Object lifecycleMonitor = new Object(); @@ -189,13 +183,11 @@ public class ApplicationContextMessageBus implements MessageBus, ApplicationCont } public void start() { - if (!this.initialized) { - this.initialize(); - } if (this.running) { return; } - this.interceptors.preStart(); + Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); + Assert.notNull(this.taskScheduler, "TaskScheduler must not be null"); synchronized (this.lifecycleMonitor) { this.activateEndpoints(); if (this.taskScheduler instanceof SimpleTaskScheduler) { @@ -207,7 +199,7 @@ public class ApplicationContextMessageBus implements MessageBus, ApplicationCont this.taskScheduler.start(); } this.running = true; - this.interceptors.postStart(); + this.applicationContext.publishEvent(new MessageBusStartedEvent(this)); if (logger.isInfoEnabled()) { logger.info("message bus started"); } @@ -217,34 +209,26 @@ public class ApplicationContextMessageBus implements MessageBus, ApplicationCont if (!this.isRunning()) { return; } - this.interceptors.preStop(); synchronized (this.lifecycleMonitor) { this.deactivateEndpoints(); this.running = false; this.taskScheduler.stop(); } - this.interceptors.postStop(); + this.applicationContext.publishEvent(new MessageBusStoppedEvent(this)); if (logger.isInfoEnabled()) { logger.info("message bus stopped"); } } + // ApplicationListener implementation + public void onApplicationEvent(ApplicationEvent event) { if (event instanceof ContextRefreshedEvent && this.autoStartup) { this.start(); } } - private void initialize() { - synchronized (this.lifecycleMonitor) { - if (this.initialized) { - return; - } - Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); - Assert.notNull(this.taskScheduler, "TaskScheduler must not be null"); - this.initialized = true; - } - } + // DisposableBean implementation public void destroy() throws Exception { this.stop(); @@ -253,61 +237,4 @@ public class ApplicationContextMessageBus implements MessageBus, ApplicationCont } } - public void addInterceptor(MessageBusInterceptor interceptor) { - this.interceptors.add(interceptor); - } - - public void removeInterceptor(MessageBusInterceptor interceptor) { - this.interceptors.remove(interceptor); - } - - public void setInterceptors(List interceptor) { - this.interceptors.set(interceptor); - } - - /* - * Wrapper class for the interceptor list - */ - private class MessageBusInterceptorsList { - - private CopyOnWriteArrayList messageBusInterceptors = new CopyOnWriteArrayList(); - - public void set(List interceptors) { - this.messageBusInterceptors.clear(); - this.messageBusInterceptors.addAll(interceptors); - } - - public void add(MessageBusInterceptor interceptor) { - this.messageBusInterceptors.add(interceptor); - } - - public void remove(MessageBusInterceptor interceptor) { - this.messageBusInterceptors.remove(interceptor); - } - - public void preStart() { - for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { - messageBusInterceptor.preStart(ApplicationContextMessageBus.this); - } - } - - public void postStart() { - for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { - messageBusInterceptor.postStart(ApplicationContextMessageBus.this); - } - } - - public void preStop() { - for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { - messageBusInterceptor.preStop(ApplicationContextMessageBus.this); - } - } - - public void postStop() { - for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { - messageBusInterceptor.postStop(ApplicationContextMessageBus.this); - } - } - } - } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStartedEvent.java similarity index 61% rename from org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptor.java rename to org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStartedEvent.java index 74ebc9c561..ca1aa824f6 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStartedEvent.java @@ -16,21 +16,22 @@ package org.springframework.integration.bus; +import org.springframework.context.ApplicationEvent; /** - * Interface for interceptors that are able be notified of the - * lifecycle of the {@link MessageBus Message Bus}. - * - * @author Marius Bogoevici + * Event raised when a MessageBus is started. + * + * @author Mark Fisher */ -public interface MessageBusInterceptor { +public class MessageBusStartedEvent extends ApplicationEvent { - void preStart(MessageBus bus); - - void postStart(MessageBus bus); - - void preStop(MessageBus bus); - - void postStop(MessageBus bus); + /** + * Create a new MessageBusStartedEvent + * @param source the MessageBus that has been started + * (must not be null) + */ + public MessageBusStartedEvent(MessageBus source) { + super(source); + } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStoppedEvent.java similarity index 61% rename from org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptorAdapter.java rename to org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStoppedEvent.java index 912081cbdf..e3b2acd98f 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusInterceptorAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBusStoppedEvent.java @@ -16,25 +16,22 @@ package org.springframework.integration.bus; +import org.springframework.context.ApplicationEvent; /** - * No-op implementation of a {@link MessageBusInterceptor}. Subclasses shall - * override only the methods for which they intend to provide behaviour. + * Event raised when a MessageBus is stopped. * - * @author Marius Bogoevici + * @author Mark Fisher */ -public class MessageBusInterceptorAdapter implements MessageBusInterceptor { +public class MessageBusStoppedEvent extends ApplicationEvent { - public void preStart(MessageBus bus) { + /** + * Create a new MessageBusStoppedEvent + * @param source the MessageBus that has been stopped + * (must not be null) + */ + public MessageBusStoppedEvent(MessageBus source) { + super(source); } - - public void postStart(MessageBus bus) { - } - - public void preStop(MessageBus bus) { - } - - public void postStop(MessageBus bus) { - } - + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java index a9b06c00e4..c91ed9654e 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java @@ -20,18 +20,14 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import org.w3c.dom.Element; -import org.w3c.dom.Node; -import org.w3c.dom.NodeList; import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.BeanDefinitionHolder; -import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; -import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; @@ -132,7 +128,6 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME); BeanDefinitionReaderUtils.registerBeanDefinition(holder, parserContext.getRegistry()); } - this.processChildElements(builder, element); this.addPostProcessors(element, parserContext); } @@ -147,23 +142,6 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { return executor; } - @SuppressWarnings("unchecked") - private void processChildElements(BeanDefinitionBuilder builder, Element element) { - NodeList childNodes = element.getChildNodes(); - ManagedList interceptors = new ManagedList(); - for (int i = 0; i < childNodes.getLength(); i++) { - Node child = childNodes.item(i); - if (child.getNodeType() == Node.ELEMENT_NODE) { - if ("interceptor".equals(child.getLocalName())) { - interceptors.add(new RuntimeBeanReference(((Element)child).getAttribute("ref"))); - } - } - } - if (interceptors.size() > 0) { - builder.addPropertyValue("interceptors", interceptors); - } - } - /** * Adds extra post-processors to the context, to inject the objects configured by the MessageBus */ diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index d1e1e61933..9f4078fcc9 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -23,13 +23,6 @@ Defines the Message Bus for this Application Context. - - - - - - - diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index 72c85f7024..44fa623b5c 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -43,8 +43,8 @@ import org.springframework.util.ClassUtils; /** * @author Mark Fisher */ -public abstract class AbstractPollingEndpoint implements MessageEndpoint, - TaskSchedulerAware, Lifecycle, InitializingBean, BeanClassLoaderAware { +public abstract class AbstractPollingEndpoint extends AbstractEndpoint + implements TaskSchedulerAware, Lifecycle, InitializingBean, BeanClassLoaderAware { public static final int MAX_MESSAGES_UNBOUNDED = -1; diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java index 10c0a50a88..d12fae6079 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DefaultMessageBusTests.java @@ -99,6 +99,7 @@ public class DefaultMessageBusTests { QueueChannel targetChannel = new QueueChannel(); targetChannel.setBeanName("targetChannel"); context.getBeanFactory().registerSingleton("targetChannel", targetChannel); + context.refresh(); bus.start(); Message result = targetChannel.receive(100); assertNull(result); @@ -151,6 +152,7 @@ public class DefaultMessageBusTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); inputChannel.send(new StringMessage("testing")); Message message1 = outputChannel1.receive(500); @@ -193,6 +195,7 @@ public class DefaultMessageBusTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); inputChannel.send(new StringMessage("testing")); latch.await(500, TimeUnit.MILLISECONDS); @@ -221,6 +224,7 @@ public class DefaultMessageBusTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); Message message = errorChannel.receive(5000); @@ -255,6 +259,7 @@ public class DefaultMessageBusTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); errorChannel.send(new ErrorMessage(new RuntimeException("test-exception"))); latch.await(1000, TimeUnit.MILLISECONDS); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index c36a5f3213..d1f194405b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -72,6 +72,7 @@ public class DirectChannelSubscriptionTests { SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel); context.getBeanFactory().registerSingleton("testEndpoint", endpoint); bus.setApplicationContext(context); + context.refresh(); bus.start(); this.sourceChannel.send(new StringMessage("foo")); Message response = this.targetChannel.receive(); @@ -86,6 +87,7 @@ public class DirectChannelSubscriptionTests { postProcessor.afterPropertiesSet(); TestEndpoint endpoint = new TestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); + context.refresh(); bus.start(); this.sourceChannel.send(new StringMessage("foo")); Message response = this.targetChannel.receive(); @@ -103,6 +105,7 @@ public class DirectChannelSubscriptionTests { consumer.setOutputChannel(targetChannel); SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel); bus.registerEndpoint(endpoint); + context.refresh(); bus.start(); this.sourceChannel.send(new StringMessage("foo")); } @@ -118,6 +121,7 @@ public class DirectChannelSubscriptionTests { postProcessor.afterPropertiesSet(); FailingTestEndpoint endpoint = new FailingTestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); + context.refresh(); bus.start(); this.sourceChannel.send(new StringMessage("foo")); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java new file mode 100644 index 0000000000..652d145bd6 --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java @@ -0,0 +1,100 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.Test; + +import org.springframework.beans.factory.support.RootBeanDefinition; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.integration.config.xml.MessageBusParser; +import org.springframework.integration.util.TestUtils; + +/** + * @author Marius Bogoevici + * @author Mark Fisher + */ +public class MessageBusEventTests { + + @Test + public void messageBusStartedEvent() { + GenericApplicationContext context = new GenericApplicationContext(); + context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusListener.class)); + ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); + messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); + messageBus.setApplicationContext(context); + context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener"); + assertNull(listener.startedBus); + assertNull(listener.stoppedBus); + context.refresh(); // bus will start + assertNotNull(listener.startedBus); + assertNull(listener.stoppedBus); + assertEquals(messageBus, listener.startedBus); + } + + @Test + public void messageBusStoppedEvent() { + GenericApplicationContext context = new GenericApplicationContext(); + context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusListener.class)); + ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); + messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); + messageBus.setApplicationContext(context); + context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener"); + assertNull(listener.startedBus); + assertNull(listener.stoppedBus); + context.refresh(); + assertNotNull(listener.startedBus); + assertNull(listener.stoppedBus); + messageBus.stop(); + assertNotNull(listener.stoppedBus); + assertEquals(messageBus, listener.stoppedBus); + } + + + public static class TestMessageBusListener implements ApplicationListener { + + private volatile MessageBus startedBus; + + private volatile MessageBus stoppedBus; + + + public MessageBus getStartedBus() { + return this.startedBus; + } + + public MessageBus getStoppedBus() { + return this.stoppedBus; + } + + public void onApplicationEvent(ApplicationEvent event) { + if (event instanceof MessageBusStartedEvent) { + this.startedBus = (MessageBus) event.getSource(); + } + if (event instanceof MessageBusStoppedEvent) { + this.stoppedBus = (MessageBus) event.getSource(); + } + } + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java deleted file mode 100644 index 21ee6deca1..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusInterceptorTests.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.bus; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.integration.bus.ApplicationContextMessageBus; -import org.springframework.integration.bus.MessageBus; -import org.springframework.integration.util.TestUtils; - -/** - * @author Marius Bogoevici - */ -public class MessageBusInterceptorTests { - - @Test - public void testStart() { - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(new GenericApplicationContext()); - TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor(); - TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor(); - // add all interceptors - messageBus.addInterceptor(startInterceptor); - messageBus.addInterceptor(stopInterceptor); - // check the state of the interceptors - executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor); - } - - public static void executeInterceptorsTest(MessageBus messageBus, TestMessageBusStartInterceptor startInterceptor, - TestMessageBusStopInterceptor stopInterceptor) { - assertTrue(!messageBus.isRunning()); - assertEquals(startInterceptor.getPreStartCounter().get(), 0); - assertEquals(startInterceptor.getPostStartCounter().get(), 0); - assertEquals(stopInterceptor.getPreStopCounter().get(), 0); - assertEquals(stopInterceptor.getPostStopCounter().get(), 0); - // start the bus - messageBus.start(); - // check the state of the interceptors - assertEquals(startInterceptor.getPreStartCounter().get(), 1); - assertEquals(startInterceptor.getPostStartCounter().get(), 1); - assertEquals(stopInterceptor.getPreStopCounter().get(), 0); - assertEquals(stopInterceptor.getPostStopCounter().get(), 0); - //stop the bus - messageBus.stop(); - //check the state of the interceptors - assertEquals(startInterceptor.getPreStartCounter().get(), 1); - assertEquals(startInterceptor.getPostStartCounter().get(), 1); - assertEquals(stopInterceptor.getPreStopCounter().get(), 1); - assertEquals(stopInterceptor.getPostStopCounter().get(), 1); - } - - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStartInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStartInterceptor.java deleted file mode 100644 index ae4aa8c176..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStartInterceptor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.bus; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.springframework.integration.bus.MessageBus; -import org.springframework.integration.bus.MessageBusInterceptorAdapter; - -/** - * @author Marius Bogoevici -*/ -public class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter { - - private AtomicInteger preStartCounter = new AtomicInteger(0); - - private AtomicInteger postStartCounter = new AtomicInteger(0); - - - public AtomicInteger getPreStartCounter() { - return preStartCounter; - } - - public AtomicInteger getPostStartCounter() { - return postStartCounter; - } - - @Override - public void preStart(MessageBus bus) { - this.preStartCounter.incrementAndGet(); - org.junit.Assert.assertTrue(!bus.isRunning()); - } - - @Override - public void postStart(MessageBus bus) { - this.postStartCounter.incrementAndGet(); - org.junit.Assert.assertTrue(bus.isRunning()); - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStopInterceptor.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStopInterceptor.java deleted file mode 100644 index 4e8ecabd16..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/TestMessageBusStopInterceptor.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.bus; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.springframework.integration.bus.MessageBus; -import org.springframework.integration.bus.MessageBusInterceptorAdapter; - -/** - * @author Marius Bogoevici -*/ -public class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter { - - private AtomicInteger preStopCounter = new AtomicInteger(0); - - private AtomicInteger postStopCounter = new AtomicInteger(0); - - - public AtomicInteger getPreStopCounter() { - return preStopCounter; - } - - public AtomicInteger getPostStopCounter() { - return postStopCounter; - } - - @Override - public void preStop(MessageBus bus) { - this.preStopCounter.incrementAndGet(); - org.junit.Assert.assertTrue(bus.isRunning()); - } - - @Override - public void postStop(MessageBus bus) { - this.postStopCounter.incrementAndGet(); - org.junit.Assert.assertTrue(!bus.isRunning()); - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java index aacd8c38d1..020299b88e 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java @@ -65,6 +65,7 @@ public class MessageChannelTemplateTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java index d2df0ac339..ef7e66a7af 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java @@ -18,6 +18,7 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import org.junit.Test; @@ -32,10 +33,8 @@ import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.core.task.SyncTaskExecutor; import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.bus.MessageBus; -import org.springframework.integration.bus.MessageBusInterceptorTests; import org.springframework.integration.bus.TestMessageBusAwareImpl; -import org.springframework.integration.bus.TestMessageBusStartInterceptor; -import org.springframework.integration.bus.TestMessageBusStopInterceptor; +import org.springframework.integration.bus.MessageBusEventTests.TestMessageBusListener; import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.scheduling.TaskScheduler; @@ -149,15 +148,30 @@ public class MessageBusParserTests { } @Test - public void testMessageBusWithInterceptors() { + public void testMessageBusEventListenerReceivesStartedEvent() { ApplicationContext context = new ClassPathXmlApplicationContext( - "messageBusWithInterceptors.xml", this.getClass()); + "messageBusWithListener.xml", this.getClass()); MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - TestMessageBusStartInterceptor startInterceptor = (TestMessageBusStartInterceptor) context.getBean( - "startInterceptor"); - TestMessageBusStopInterceptor stopInterceptor = (TestMessageBusStopInterceptor) context.getBean( - "stopInterceptor"); - MessageBusInterceptorTests.executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor); + TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener"); + assertNull(listener.getStartedBus()); + assertNull(listener.getStoppedBus()); + messageBus.start(); + assertNotNull(listener.getStartedBus()); + assertEquals(messageBus, listener.getStartedBus()); + assertNull(listener.getStoppedBus()); + } + + @Test + public void testMessageBusEventListenerReceivesStoppedEvent() { + ApplicationContext context = new ClassPathXmlApplicationContext( + "messageBusWithListener.xml", this.getClass()); + MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); + TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener"); + assertNull(listener.getStoppedBus()); + messageBus.start(); + messageBus.stop(); + assertNotNull(listener.getStoppedBus()); + assertEquals(messageBus, listener.getStoppedBus()); } @Test diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index 2a5164b6b8..49e88adcf8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -143,6 +143,7 @@ public class MessagingAnnotationPostProcessorTests { CountDownLatch latch = new CountDownLatch(1); OutboundChannelAdapterTestBean testBean = new OutboundChannelAdapterTestBean(latch); postProcessor.postProcessAfterInitialization(testBean, "testBean"); + context.refresh(); messageBus.start(); ChannelResolver channelResolver = new BeanFactoryChannelResolver(context); MessageChannel testChannel = channelResolver.resolveChannelName("testChannel"); @@ -186,6 +187,7 @@ public class MessagingAnnotationPostProcessorTests { MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); + context.refresh(); messageBus.start(); ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean(); postProcessor.postProcessAfterInitialization(bean, "testBean"); @@ -216,6 +218,7 @@ public class MessagingAnnotationPostProcessorTests { ProxyFactory proxyFactory = new ProxyFactory(new AnnotatedTestService()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); @@ -241,6 +244,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); @@ -268,6 +272,7 @@ public class MessagingAnnotationPostProcessorTests { ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); @@ -293,6 +298,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); @@ -318,6 +324,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); @@ -345,6 +352,7 @@ public class MessagingAnnotationPostProcessorTests { ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation()); Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); @@ -390,6 +398,7 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); + context.refresh(); messageBus.start(); ChannelResolver channelResolver = new BeanFactoryChannelResolver(context); DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel"); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java index 613ed05ccf..1e8624c7f7 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java @@ -66,6 +66,7 @@ public class RouterAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); TestRouter testRouter = new TestRouter(); postProcessor.postProcessAfterInitialization(testRouter, "test"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("foo")); Message replyMessage = outputChannel.receive(0); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java index 6c065eaa4f..ba56ee9fc0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java @@ -68,6 +68,7 @@ public class SplitterAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); TestSplitter splitter = new TestSplitter(); postProcessor.postProcessAfterInitialization(splitter, "testSplitter"); + context.refresh(); messageBus.start(); inputChannel.send(new StringMessage("this.is.a.test")); Message message1 = outputChannel.receive(500); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusElementAndBean.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusElementAndBean.xml index 280425ecce..53c3de920b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusElementAndBean.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusElementAndBean.xml @@ -9,6 +9,6 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithListener.xml similarity index 58% rename from org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml rename to org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithListener.xml index 60f48d2f36..6bb9033e52 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithInterceptors.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/messageBusWithListener.xml @@ -7,13 +7,8 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - - - - + - - - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java index 7b872a6cbc..018ba843ed 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingConsumerTests.java @@ -89,6 +89,7 @@ public class MethodInvokingConsumerTests { ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); bus.setApplicationContext(context); + context.refresh(); bus.start(); String result = queue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(result);