This commit is contained in:
Mark Fisher
2008-10-20 11:38:00 +00:00
parent e31520bc64
commit 25d0ac78b6
20 changed files with 183 additions and 336 deletions

View File

@@ -99,6 +99,7 @@ public class DefaultMessageBusTests {
QueueChannel targetChannel = new QueueChannel();
targetChannel.setBeanName("targetChannel");
context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
context.refresh();
bus.start();
Message<?> result = targetChannel.receive(100);
assertNull(result);
@@ -151,6 +152,7 @@ public class DefaultMessageBusTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
inputChannel.send(new StringMessage("testing"));
Message<?> message1 = outputChannel1.receive(500);
@@ -193,6 +195,7 @@ public class DefaultMessageBusTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
inputChannel.send(new StringMessage("testing"));
latch.await(500, TimeUnit.MILLISECONDS);
@@ -221,6 +224,7 @@ public class DefaultMessageBusTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message<?> message = errorChannel.receive(5000);
@@ -255,6 +259,7 @@ public class DefaultMessageBusTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));
latch.await(1000, TimeUnit.MILLISECONDS);

View File

@@ -72,6 +72,7 @@ public class DirectChannelSubscriptionTests {
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(serviceActivator, sourceChannel);
context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
bus.setApplicationContext(context);
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message<?> response = this.targetChannel.receive();
@@ -86,6 +87,7 @@ public class DirectChannelSubscriptionTests {
postProcessor.afterPropertiesSet();
TestEndpoint endpoint = new TestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message<?> response = this.targetChannel.receive();
@@ -103,6 +105,7 @@ public class DirectChannelSubscriptionTests {
consumer.setOutputChannel(targetChannel);
SubscribingConsumerEndpoint endpoint = new SubscribingConsumerEndpoint(consumer, sourceChannel);
bus.registerEndpoint(endpoint);
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
}
@@ -118,6 +121,7 @@ public class DirectChannelSubscriptionTests {
postProcessor.afterPropertiesSet();
FailingTestEndpoint endpoint = new FailingTestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
bus.start();
this.sourceChannel.send(new StringMessage("foo"));
}

View File

@@ -0,0 +1,100 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
* @author Mark Fisher
*/
public class MessageBusEventTests {
@Test
public void messageBusStartedEvent() {
GenericApplicationContext context = new GenericApplicationContext();
context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusListener.class));
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener");
assertNull(listener.startedBus);
assertNull(listener.stoppedBus);
context.refresh(); // bus will start
assertNotNull(listener.startedBus);
assertNull(listener.stoppedBus);
assertEquals(messageBus, listener.startedBus);
}
@Test
public void messageBusStoppedEvent() {
GenericApplicationContext context = new GenericApplicationContext();
context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusListener.class));
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(context);
context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener");
assertNull(listener.startedBus);
assertNull(listener.stoppedBus);
context.refresh();
assertNotNull(listener.startedBus);
assertNull(listener.stoppedBus);
messageBus.stop();
assertNotNull(listener.stoppedBus);
assertEquals(messageBus, listener.stoppedBus);
}
public static class TestMessageBusListener implements ApplicationListener {
private volatile MessageBus startedBus;
private volatile MessageBus stoppedBus;
public MessageBus getStartedBus() {
return this.startedBus;
}
public MessageBus getStoppedBus() {
return this.stoppedBus;
}
public void onApplicationEvent(ApplicationEvent event) {
if (event instanceof MessageBusStartedEvent) {
this.startedBus = (MessageBus) event.getSource();
}
if (event instanceof MessageBusStoppedEvent) {
this.stoppedBus = (MessageBus) event.getSource();
}
}
}
}

View File

@@ -1,72 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.util.TestUtils;
/**
* @author Marius Bogoevici
*/
public class MessageBusInterceptorTests {
@Test
public void testStart() {
ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
messageBus.setApplicationContext(new GenericApplicationContext());
TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor();
TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor();
// add all interceptors
messageBus.addInterceptor(startInterceptor);
messageBus.addInterceptor(stopInterceptor);
// check the state of the interceptors
executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor);
}
public static void executeInterceptorsTest(MessageBus messageBus, TestMessageBusStartInterceptor startInterceptor,
TestMessageBusStopInterceptor stopInterceptor) {
assertTrue(!messageBus.isRunning());
assertEquals(startInterceptor.getPreStartCounter().get(), 0);
assertEquals(startInterceptor.getPostStartCounter().get(), 0);
assertEquals(stopInterceptor.getPreStopCounter().get(), 0);
assertEquals(stopInterceptor.getPostStopCounter().get(), 0);
// start the bus
messageBus.start();
// check the state of the interceptors
assertEquals(startInterceptor.getPreStartCounter().get(), 1);
assertEquals(startInterceptor.getPostStartCounter().get(), 1);
assertEquals(stopInterceptor.getPreStopCounter().get(), 0);
assertEquals(stopInterceptor.getPostStopCounter().get(), 0);
//stop the bus
messageBus.stop();
//check the state of the interceptors
assertEquals(startInterceptor.getPreStartCounter().get(), 1);
assertEquals(startInterceptor.getPostStartCounter().get(), 1);
assertEquals(stopInterceptor.getPreStopCounter().get(), 1);
assertEquals(stopInterceptor.getPostStopCounter().get(), 1);
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.MessageBusInterceptorAdapter;
/**
* @author Marius Bogoevici
*/
public class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStartCounter = new AtomicInteger(0);
private AtomicInteger postStartCounter = new AtomicInteger(0);
public AtomicInteger getPreStartCounter() {
return preStartCounter;
}
public AtomicInteger getPostStartCounter() {
return postStartCounter;
}
@Override
public void preStart(MessageBus bus) {
this.preStartCounter.incrementAndGet();
org.junit.Assert.assertTrue(!bus.isRunning());
}
@Override
public void postStart(MessageBus bus) {
this.postStartCounter.incrementAndGet();
org.junit.Assert.assertTrue(bus.isRunning());
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.MessageBusInterceptorAdapter;
/**
* @author Marius Bogoevici
*/
public class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStopCounter = new AtomicInteger(0);
private AtomicInteger postStopCounter = new AtomicInteger(0);
public AtomicInteger getPreStopCounter() {
return preStopCounter;
}
public AtomicInteger getPostStopCounter() {
return postStopCounter;
}
@Override
public void preStop(MessageBus bus) {
this.preStopCounter.incrementAndGet();
org.junit.Assert.assertTrue(bus.isRunning());
}
@Override
public void postStop(MessageBus bus) {
this.postStopCounter.incrementAndGet();
org.junit.Assert.assertTrue(!bus.isRunning());
}
}

View File

@@ -65,6 +65,7 @@ public class MessageChannelTemplateTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
}

View File

@@ -18,6 +18,7 @@ package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
@@ -32,10 +33,8 @@ import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.MessageBusInterceptorTests;
import org.springframework.integration.bus.TestMessageBusAwareImpl;
import org.springframework.integration.bus.TestMessageBusStartInterceptor;
import org.springframework.integration.bus.TestMessageBusStopInterceptor;
import org.springframework.integration.bus.MessageBusEventTests.TestMessageBusListener;
import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.scheduling.TaskScheduler;
@@ -149,15 +148,30 @@ public class MessageBusParserTests {
}
@Test
public void testMessageBusWithInterceptors() {
public void testMessageBusEventListenerReceivesStartedEvent() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithInterceptors.xml", this.getClass());
"messageBusWithListener.xml", this.getClass());
MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
TestMessageBusStartInterceptor startInterceptor = (TestMessageBusStartInterceptor) context.getBean(
"startInterceptor");
TestMessageBusStopInterceptor stopInterceptor = (TestMessageBusStopInterceptor) context.getBean(
"stopInterceptor");
MessageBusInterceptorTests.executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor);
TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener");
assertNull(listener.getStartedBus());
assertNull(listener.getStoppedBus());
messageBus.start();
assertNotNull(listener.getStartedBus());
assertEquals(messageBus, listener.getStartedBus());
assertNull(listener.getStoppedBus());
}
@Test
public void testMessageBusEventListenerReceivesStoppedEvent() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithListener.xml", this.getClass());
MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
TestMessageBusListener listener = (TestMessageBusListener) context.getBean("listener");
assertNull(listener.getStoppedBus());
messageBus.start();
messageBus.stop();
assertNotNull(listener.getStoppedBus());
assertEquals(messageBus, listener.getStoppedBus());
}
@Test

View File

@@ -143,6 +143,7 @@ public class MessagingAnnotationPostProcessorTests {
CountDownLatch latch = new CountDownLatch(1);
OutboundChannelAdapterTestBean testBean = new OutboundChannelAdapterTestBean(latch);
postProcessor.postProcessAfterInitialization(testBean, "testBean");
context.refresh();
messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
MessageChannel testChannel = channelResolver.resolveChannelName("testChannel");
@@ -186,6 +187,7 @@ public class MessagingAnnotationPostProcessorTests {
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
context.refresh();
messageBus.start();
ServiceActivatorAnnotatedBean bean = new ServiceActivatorAnnotatedBean();
postProcessor.postProcessAfterInitialization(bean, "testBean");
@@ -216,6 +218,7 @@ public class MessagingAnnotationPostProcessorTests {
ProxyFactory proxyFactory = new ProxyFactory(new AnnotatedTestService());
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
@@ -241,6 +244,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
@@ -268,6 +272,7 @@ public class MessagingAnnotationPostProcessorTests {
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointSubclass());
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("world"));
Message<?> message = outputChannel.receive(1000);
@@ -293,6 +298,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
@@ -318,6 +324,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
@@ -345,6 +352,7 @@ public class MessagingAnnotationPostProcessorTests {
ProxyFactory proxyFactory = new ProxyFactory(new SimpleAnnotatedEndpointImplementation());
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message<?> message = outputChannel.receive(1000);
@@ -390,6 +398,7 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
context.refresh();
messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel");

View File

@@ -66,6 +66,7 @@ public class RouterAnnotationPostProcessorTests {
postProcessor.afterPropertiesSet();
TestRouter testRouter = new TestRouter();
postProcessor.postProcessAfterInitialization(testRouter, "test");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("foo"));
Message<?> replyMessage = outputChannel.receive(0);

View File

@@ -68,6 +68,7 @@ public class SplitterAnnotationPostProcessorTests {
postProcessor.afterPropertiesSet();
TestSplitter splitter = new TestSplitter();
postProcessor.postProcessAfterInitialization(splitter, "testSplitter");
context.refresh();
messageBus.start();
inputChannel.send(new StringMessage("this.is.a.test"));
Message<?> message1 = outputChannel.receive(500);

View File

@@ -9,6 +9,6 @@
<integration:message-bus/>
<bean id="bus" class="org.springframework.integration.bus.DefaultMessageBus"/>
<bean id="bus" class="org.springframework.integration.bus.ApplicationContextMessageBus"/>
</beans>

View File

@@ -7,13 +7,8 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
<message-bus auto-startup="false">
<interceptor ref="startInterceptor"/>
<interceptor ref="stopInterceptor"/>
</message-bus>
<message-bus auto-startup="false"/>
<beans:bean id="startInterceptor" class="org.springframework.integration.bus.TestMessageBusStartInterceptor"/>
<beans:bean id="stopInterceptor" class="org.springframework.integration.bus.TestMessageBusStopInterceptor"/>
<beans:bean id="listener" class="org.springframework.integration.bus.MessageBusEventTests$TestMessageBusListener"/>
</beans:beans>

View File

@@ -89,6 +89,7 @@ public class MethodInvokingConsumerTests {
ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
bus.setApplicationContext(context);
context.refresh();
bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(result);