Adds support for an <interceptor/> sub-element of <message-bus/>

This commit is contained in:
Marius Bogoevici
2008-06-22 17:36:45 +00:00
parent f3d2c23069
commit 490a3d9951
7 changed files with 171 additions and 70 deletions

View File

@@ -19,9 +19,8 @@ package org.springframework.integration.bus.interceptor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
import org.springframework.integration.bus.MessageBus;
/**
@@ -38,6 +37,12 @@ public class MessageBusInterceptorTests {
messageBus.addInterceptor(startInterceptor);
messageBus.addInterceptor(stopInterceptor);
// check the state of the interceptors
executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor);
}
public static void executeInterceptorsTest(MessageBus messageBus, TestMessageBusStartInterceptor startInterceptor,
TestMessageBusStopInterceptor stopInterceptor) {
assertTrue(!messageBus.isRunning());
assertEquals(startInterceptor.getPreStartCounter().get(), 0);
assertEquals(startInterceptor.getPostStartCounter().get(), 0);
assertEquals(stopInterceptor.getPreStopCounter().get(), 0);
@@ -57,64 +62,6 @@ public class MessageBusInterceptorTests {
assertEquals(stopInterceptor.getPreStopCounter().get(), 1);
assertEquals(stopInterceptor.getPostStopCounter().get(), 1);
}
private static class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStartCounter = new AtomicInteger(0);
private AtomicInteger postStartCounter = new AtomicInteger(0);
public AtomicInteger getPreStartCounter() {
return preStartCounter;
}
public AtomicInteger getPostStartCounter() {
return postStartCounter;
}
@Override
public void preStart(MessageBus bus) {
this.preStartCounter.incrementAndGet();
assertTrue(!bus.isRunning());
}
@Override
public void postStart(MessageBus bus) {
this.postStartCounter.incrementAndGet();
assertTrue(bus.isRunning());
}
}
private static class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStopCounter = new AtomicInteger(0);
private AtomicInteger postStopCounter = new AtomicInteger(0);
public AtomicInteger getPreStopCounter() {
return preStopCounter;
}
public AtomicInteger getPostStopCounter() {
return postStopCounter;
}
@Override
public void preStop(MessageBus bus) {
this.preStopCounter.incrementAndGet();
assertTrue(bus.isRunning());
}
@Override
public void postStop(MessageBus bus) {
this.postStopCounter.incrementAndGet();
assertTrue(!bus.isRunning());
}
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus.interceptor;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.bus.MessageBus;
/**
* @author Marius Bogoevici
*/
public class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStartCounter = new AtomicInteger(0);
private AtomicInteger postStartCounter = new AtomicInteger(0);
public AtomicInteger getPreStartCounter() {
return preStartCounter;
}
public AtomicInteger getPostStartCounter() {
return postStartCounter;
}
@Override
public void preStart(MessageBus bus) {
this.preStartCounter.incrementAndGet();
org.junit.Assert.assertTrue(!bus.isRunning());
}
@Override
public void postStart(MessageBus bus) {
this.postStartCounter.incrementAndGet();
org.junit.Assert.assertTrue(bus.isRunning());
}
}

View File

@@ -0,0 +1,53 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.bus.interceptor;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.integration.bus.MessageBus;
/**
* @author Marius Bogoevici
*/
public class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter {
private AtomicInteger preStopCounter = new AtomicInteger(0);
private AtomicInteger postStopCounter = new AtomicInteger(0);
public AtomicInteger getPreStopCounter() {
return preStopCounter;
}
public AtomicInteger getPostStopCounter() {
return postStopCounter;
}
@Override
public void preStop(MessageBus bus) {
this.preStopCounter.incrementAndGet();
org.junit.Assert.assertTrue(bus.isRunning());
}
@Override
public void postStop(MessageBus bus) {
this.postStopCounter.incrementAndGet();
org.junit.Assert.assertTrue(!bus.isRunning());
}
}

View File

@@ -33,6 +33,9 @@ import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.ConfigurationException;
import org.springframework.integration.bus.MessageBus;
import org.springframework.integration.bus.TestMessageBusAwareImpl;
import org.springframework.integration.bus.interceptor.MessageBusInterceptorTests;
import org.springframework.integration.bus.interceptor.TestMessageBusStartInterceptor;
import org.springframework.integration.bus.interceptor.TestMessageBusStopInterceptor;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dispatcher.DirectChannel;
import org.springframework.integration.endpoint.TargetEndpoint;
@@ -64,7 +67,7 @@ public class MessageBusParserTests {
assertNotNull("bus should have created a default error channel", bus.getErrorChannel());
}
@Test(expected=ConfigurationException.class)
@Test(expected = ConfigurationException.class)
public void testAutoCreateChannelsDisabledByDefault() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithDefaults.xml", this.getClass());
@@ -110,7 +113,7 @@ public class MessageBusParserTests {
// tries to get a reference to the message bus
assertEquals(BeanCreationException.class, e.getCause().getClass());
assertEquals(e.getBeanName(), MessageBusParser.MESSAGE_BUS_AWARE_POST_PROCESSOR_BEAN_NAME);
assertEquals(ConfigurationException.class, ((BeanCreationException) e.getCause()).getCause().getClass());
assertEquals(ConfigurationException.class, (e.getCause()).getCause().getClass());
assertEquals(((BeanCreationException) e.getCause()).getBeanName(), MessageBusParser.MESSAGE_BUS_BEAN_NAME);
}
assertTrue(exceptionThrown);
@@ -140,9 +143,9 @@ public class MessageBusParserTests {
"messageBusWithDefaultConcurrencyTests.xml", this.getClass());
TargetEndpoint endpoint2 = (TargetEndpoint) context.getBean("endpoint2");
assertEquals(14, endpoint2.getConcurrencyPolicy().getCoreSize());
assertEquals(17, endpoint2.getConcurrencyPolicy().getMaxSize());
assertEquals(17, endpoint2.getConcurrencyPolicy().getMaxSize());
}
@Test
public void testMessageBusAwareAutomaticallyAddedByNamespace() {
ApplicationContext context = new ClassPathXmlApplicationContext(
@@ -153,7 +156,7 @@ public class MessageBusParserTests {
@Test
public void testMessageBusWithChannelFactory() {
ApplicationContext context = new ClassPathXmlApplicationContext("messageBusWithChannelFactory.xml",
ApplicationContext context = new ClassPathXmlApplicationContext("messageBusWithChannelFactory.xml",
this.getClass());
assertEquals(DirectChannel.class, context.getBean("defaultTypeChannel").getClass());
assertEquals(QueueChannel.class, context.getBean("specifiedTypeChannel").getClass());
@@ -194,4 +197,16 @@ public class MessageBusParserTests {
assertEquals(SimpleMessagingTaskScheduler.class, taskExecutor.getClass());
}
@Test
public void testMessageBusWithInterceptors() {
ApplicationContext context = new ClassPathXmlApplicationContext(
"messageBusWithInterceptors.xml", this.getClass());
MessageBus messageBus = (MessageBus) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
TestMessageBusStartInterceptor startInterceptor = (TestMessageBusStartInterceptor) context.getBean(
"startInterceptor");
TestMessageBusStopInterceptor stopInterceptor = (TestMessageBusStopInterceptor) context.getBean(
"stopInterceptor");
MessageBusInterceptorTests.executeInterceptorsTest(messageBus, startInterceptor, stopInterceptor);
}
}

View File

@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-core-1.0.xsd">
<message-bus auto-startup="false">
<interceptor ref="startInterceptor"/>
<interceptor ref="stopInterceptor"/>
</message-bus>
<beans:bean id="startInterceptor" class="org.springframework.integration.bus.interceptor.TestMessageBusStartInterceptor"/>
<beans:bean id="stopInterceptor" class="org.springframework.integration.bus.interceptor.TestMessageBusStopInterceptor"/>
</beans:beans>