From 3f584ea2b7295b561e210bc169bd33591acec461 Mon Sep 17 00:00:00 2001 From: Marius Bogoevici Date: Tue, 17 Jun 2008 04:52:23 +0000 Subject: [PATCH] Adds a MessageBusInterceptor --- .../integration/bus/MessageBus.java | 146 ++++++++++++------ .../interceptor/MessageBusInterceptor.java | 37 +++++ .../MessageBusInterceptorAdapter.java | 41 +++++ .../MessageBusInterceptorTests.java | 120 ++++++++++++++ 4 files changed, 301 insertions(+), 43 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptor.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorAdapter.java create mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java index 9a66b9ac9d..0b4824bfc0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/MessageBus.java @@ -27,7 +27,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -39,6 +38,7 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.integration.ConfigurationException; +import org.springframework.integration.bus.interceptor.MessageBusInterceptor; import org.springframework.integration.channel.ChannelRegistry; import org.springframework.integration.channel.ChannelRegistryAware; import org.springframework.integration.channel.DefaultChannelRegistry; @@ -57,7 +57,6 @@ import org.springframework.integration.endpoint.TargetEndpoint; import org.springframework.integration.handler.MessageHandler; import org.springframework.integration.message.Target; import org.springframework.integration.scheduling.MessagePublishingErrorHandler; -import org.springframework.integration.scheduling.MessagingTask; import org.springframework.integration.scheduling.MessagingTaskScheduler; import org.springframework.integration.scheduling.Schedule; import org.springframework.integration.scheduling.SimpleMessagingTaskScheduler; @@ -71,15 +70,15 @@ import org.springframework.util.Assert; * @author Mark Fisher * @author Marius Bogoevici */ -public class MessageBus implements ChannelRegistry, EndpointRegistry, ApplicationContextAware, ApplicationListener, Lifecycle { +public class MessageBus implements ChannelRegistry, EndpointRegistry, ApplicationContextAware, ApplicationListener, + Lifecycle { public static final String ERROR_CHANNEL_NAME = "errorChannel"; private static final int DEFAULT_DISPATCHER_POOL_SIZE = 10; - private final Log logger = LogFactory.getLog(this.getClass()); - + private volatile ChannelFactory channelFactory = new QueueChannelFactory(); private final ChannelRegistry channelRegistry = new DefaultChannelRegistry(); @@ -90,6 +89,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio private final List lifecycleEndpoints = new CopyOnWriteArrayList(); + private final MessageBusInterceptorsList interceptors = new MessageBusInterceptorsList(); + private volatile MessagingTaskScheduler taskScheduler; private volatile ScheduledExecutorService executor; @@ -112,19 +113,17 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio private final Object lifecycleMonitor = new Object(); - /** * Set the {@link ChannelFactory} to use for auto-creating channels. */ public void setChannelFactory(ChannelFactory channelFactory) { this.channelFactory = channelFactory; } - + public ChannelFactory getChannelFactory() { return channelFactory; } - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Assert.notNull(applicationContext, "'applicationContext' must not be null"); if (applicationContext.getBeanNamesForType(this.getClass()).length > 1) { @@ -179,8 +178,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio @SuppressWarnings("unchecked") private void registerChannels(ApplicationContext context) { - Map channelBeans = - (Map) context.getBeansOfType(MessageChannel.class); + Map channelBeans = (Map) context + .getBeansOfType(MessageChannel.class); for (Map.Entry entry : channelBeans.entrySet()) { this.registerChannel(entry.getKey(), entry.getValue()); } @@ -188,8 +187,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio @SuppressWarnings("unchecked") private void registerEndpoints(ApplicationContext context) { - Map endpointBeans = - (Map) context.getBeansOfType(MessageEndpoint.class); + Map endpointBeans = (Map) context + .getBeansOfType(MessageEndpoint.class); for (Map.Entry entry : endpointBeans.entrySet()) { this.registerEndpoint(entry.getKey(), entry.getValue()); } @@ -197,8 +196,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio @SuppressWarnings("unchecked") private void registerGateways(ApplicationContext context) { - Map gatewayBeans = - (Map) context.getBeansOfType(MessagingGateway.class); + Map gatewayBeans = (Map) context + .getBeansOfType(MessagingGateway.class); for (Map.Entry entry : gatewayBeans.entrySet()) { this.registerGateway(entry.getKey(), entry.getValue()); } @@ -263,7 +262,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio this.registerHandler(name, handler, subscription, this.defaultConcurrencyPolicy); } - public void registerHandler(String name, MessageHandler handler, Subscription subscription, ConcurrencyPolicy concurrencyPolicy) { + public void registerHandler(String name, MessageHandler handler, Subscription subscription, + ConcurrencyPolicy concurrencyPolicy) { Assert.notNull(handler, "'handler' must not be null"); HandlerEndpoint endpoint = new HandlerEndpoint(handler); this.doRegisterEndpoint(name, endpoint, subscription, concurrencyPolicy); @@ -273,13 +273,15 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio this.registerTarget(name, target, subscription, this.defaultConcurrencyPolicy); } - public void registerTarget(String name, Target target, Subscription subscription, ConcurrencyPolicy concurrencyPolicy) { + public void registerTarget(String name, Target target, Subscription subscription, + ConcurrencyPolicy concurrencyPolicy) { Assert.notNull(target, "'target' must not be null"); TargetEndpoint endpoint = new TargetEndpoint(target); this.doRegisterEndpoint(name, endpoint, subscription, concurrencyPolicy); } - private void doRegisterEndpoint(String name, TargetEndpoint endpoint, Subscription subscription, ConcurrencyPolicy concurrencyPolicy) { + private void doRegisterEndpoint(String name, TargetEndpoint endpoint, Subscription subscription, + ConcurrencyPolicy concurrencyPolicy) { endpoint.setName(name); endpoint.setSubscription(subscription); endpoint.setConcurrencyPolicy(concurrencyPolicy); @@ -294,7 +296,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio ((ChannelRegistryAware) endpoint).setChannelRegistry(this.channelRegistry); } if (endpoint instanceof TargetEndpoint) { - this.registerTargetEndpoint(name, (TargetEndpoint) endpoint); + this.registerTargetEndpoint((TargetEndpoint) endpoint); } else if (endpoint instanceof SourceEndpoint) { this.registerSourceEndpoint(name, (SourceEndpoint) endpoint); @@ -308,7 +310,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } } - private void registerTargetEndpoint(String name, TargetEndpoint endpoint) { + private void registerTargetEndpoint(TargetEndpoint endpoint) { if (endpoint.getConcurrencyPolicy() == null && this.defaultConcurrencyPolicy != null) { endpoint.setConcurrencyPolicy(this.defaultConcurrencyPolicy); } @@ -359,26 +361,26 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio private void activateTargetEndpoint(TargetEndpoint endpoint) { Subscription subscription = endpoint.getSubscription(); if (subscription == null) { - throw new ConfigurationException("Unable to register endpoint '" + - endpoint + "'. No subscription information is available."); + throw new ConfigurationException("Unable to register endpoint '" + endpoint + + "'. No subscription information is available."); } MessageChannel channel = subscription.getChannel(); if (channel == null) { String channelName = subscription.getChannelName(); if (channelName == null) { - throw new ConfigurationException("endpoint '" + endpoint + - "' must provide either 'channel' or 'channelName' in its subscription metadata"); + throw new ConfigurationException("endpoint '" + endpoint + + "' must provide either 'channel' or 'channelName' in its subscription metadata"); } channel = this.lookupChannel(channelName); if (channel == null) { - if (this.autoCreateChannels == false) { - throw new ConfigurationException("Cannot activate subscription, unknown channel '" + channelName + - "'. Consider enabling the 'autoCreateChannels' option for the message bus."); + if (!this.autoCreateChannels) { + throw new ConfigurationException("Cannot activate subscription, unknown channel '" + channelName + + "'. Consider enabling the 'autoCreateChannels' option for the message bus."); } if (this.logger.isInfoEnabled()) { logger.info("auto-creating channel '" + channelName + "'"); } - channel = channelFactory.getChannel(null, null); + channel = channelFactory.getChannel(null, null); this.registerChannel(channelName, channel); } } @@ -387,24 +389,22 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio String outputChannelName = handlerEndpoint.getOutputChannelName(); if (outputChannelName != null && this.lookupChannel(outputChannelName) == null) { if (!this.autoCreateChannels) { - throw new ConfigurationException("Unknown channel '" + outputChannelName + - "' configured as output channel for endpoint '" + endpoint + - "'. Consider enabling the 'autoCreateChannels' option for the message bus."); + throw new ConfigurationException("Unknown channel '" + outputChannelName + + "' configured as output channel for endpoint '" + endpoint + + "'. Consider enabling the 'autoCreateChannels' option for the message bus."); } this.registerChannel(outputChannelName, new QueueChannel()); } } - if (endpoint instanceof TargetEndpoint) { - TargetEndpoint targetEndpoint = (TargetEndpoint) endpoint; - if (!targetEndpoint.hasErrorHandler() && this.getErrorChannel() != null && !this.getErrorChannel().equals(channel)) { - targetEndpoint.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel())); - } + if (!endpoint.hasErrorHandler() && this.getErrorChannel() != null && !this.getErrorChannel().equals(channel)) { + endpoint.setErrorHandler(new MessagePublishingErrorHandler(this.getErrorChannel())); } endpoint.afterPropertiesSet(); this.activateSubscription(channel, endpoint, subscription.getSchedule()); if (logger.isInfoEnabled()) { - logger.info("activated subscription to channel '" + channel.getName() + - "' for endpoint '" + endpoint + "'"); + logger + .info("activated subscription to channel '" + channel.getName() + "' for endpoint '" + endpoint + + "'"); } } @@ -412,9 +412,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio if (!this.initialized) { this.initialize(); } - if (endpoint instanceof MessagingTask) { - this.taskScheduler.schedule((MessagingTask) endpoint); - } + this.taskScheduler.schedule(endpoint); if (endpoint instanceof Lifecycle) { this.lifecycleEndpoints.add((Lifecycle) endpoint); if (this.isRunning()) { @@ -442,7 +440,8 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio SubscriptionManager manager = this.subscriptionManagers.get(channel); if (manager == null) { if (logger.isWarnEnabled()) { - logger.warn("no subscription manager available for channel '" + channel + "', be sure to register the channel"); + logger.warn("no subscription manager available for channel '" + channel + + "', be sure to register the channel"); } return; } @@ -465,6 +464,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio if (this.isRunning() || this.starting) { return; } + this.interceptors.preStart(); this.starting = true; synchronized (this.lifecycleMonitor) { this.activateEndpoints(); @@ -484,6 +484,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } this.running = true; this.starting = false; + this.interceptors.postStart(); if (logger.isInfoEnabled()) { logger.info("message bus started"); } @@ -493,6 +494,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio if (!this.isRunning()) { return; } + this.interceptors.preStop(); synchronized (this.lifecycleMonitor) { this.running = false; this.taskScheduler.stop(); @@ -509,6 +511,7 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio } } } + this.interceptors.postStop(); if (logger.isInfoEnabled()) { logger.info("message bus stopped"); } @@ -532,12 +535,69 @@ public class MessageBus implements ChannelRegistry, EndpointRegistry, Applicatio private void doConfigureAsyncEventMulticaster(ApplicationContext context) { String multicasterBeanName = AbstractApplicationContext.APPLICATION_EVENT_MULTICASTER_BEAN_NAME; if (context.containsBean(multicasterBeanName)) { - ApplicationEventMulticaster multicaster = - (ApplicationEventMulticaster) context.getBean(multicasterBeanName); + ApplicationEventMulticaster multicaster = (ApplicationEventMulticaster) context + .getBean(multicasterBeanName); if (multicaster instanceof SimpleApplicationEventMulticaster) { ((SimpleApplicationEventMulticaster) multicaster).setTaskExecutor(this.taskScheduler); } } } + public void addInterceptor(MessageBusInterceptor interceptor) { + this.interceptors.add(interceptor); + } + + public void removeInterceptor(MessageBusInterceptor interceptor) { + this.interceptors.remove(interceptor); + } + + public void setInterceptors(List interceptor) { + this.interceptors.set(interceptor); + } + + /* + * Wrapper class for the interceptor list + */ + private class MessageBusInterceptorsList { + + private CopyOnWriteArrayList messageBusInterceptors = new CopyOnWriteArrayList(); + + public void set(List interceptors) { + this.messageBusInterceptors.clear(); + this.messageBusInterceptors.addAll(interceptors); + } + + public void add(MessageBusInterceptor interceptor) { + this.messageBusInterceptors.add(interceptor); + } + + public void remove(MessageBusInterceptor interceptor) { + this.messageBusInterceptors.remove(interceptor); + } + + public void preStart() { + for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { + messageBusInterceptor.preStart(MessageBus.this); + } + } + + public void postStart() { + for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { + messageBusInterceptor.postStart(MessageBus.this); + } + } + + public void preStop() { + for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { + messageBusInterceptor.preStop(MessageBus.this); + } + } + + public void postStop() { + for (MessageBusInterceptor messageBusInterceptor : messageBusInterceptors) { + messageBusInterceptor.postStop(MessageBus.this); + } + } + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptor.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptor.java new file mode 100644 index 0000000000..12617a1a4e --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptor.java @@ -0,0 +1,37 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus.interceptor; + +import org.springframework.integration.bus.MessageBus; + +/** + * Interface for interceptors that are able be notified of the + * lifecycle of the {@link MessageBus Message Bus}. + * + * @author Marius Bogoevici + */ +public interface MessageBusInterceptor { + + void preStart(MessageBus bus); + + void postStart(MessageBus bus); + + void preStop(MessageBus bus); + + void postStop(MessageBus bus); + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorAdapter.java new file mode 100644 index 0000000000..a0d3f6d649 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorAdapter.java @@ -0,0 +1,41 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus.interceptor; + +import org.springframework.integration.bus.MessageBus; + +/** + * No-op implementation of a {@link MessageBusInterceptor}. Subclasses shall + * override only the methods for which they intend to provide behaviour. + * + * @author Marius Bogoevici + */ +public class MessageBusInterceptorAdapter implements MessageBusInterceptor { + + public void preStart(MessageBus bus) { + } + + public void postStart(MessageBus bus) { + } + + public void preStop(MessageBus bus) { + } + + public void postStop(MessageBus bus) { + } + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java new file mode 100644 index 0000000000..9a50264f9b --- /dev/null +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/interceptor/MessageBusInterceptorTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.bus.interceptor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.springframework.integration.bus.MessageBus; + +/** + * @author Marius Bogoevici + */ +public class MessageBusInterceptorTests { + + @Test + public void testStart() { + MessageBus messageBus = new MessageBus(); + TestMessageBusStartInterceptor startInterceptor = new TestMessageBusStartInterceptor(); + TestMessageBusStopInterceptor stopInterceptor = new TestMessageBusStopInterceptor(); + // add all interceptors + messageBus.addInterceptor(startInterceptor); + messageBus.addInterceptor(stopInterceptor); + // check the state of the interceptors + assertEquals(startInterceptor.getPreStartCounter().get(), 0); + assertEquals(startInterceptor.getPostStartCounter().get(), 0); + assertEquals(stopInterceptor.getPreStopCounter().get(), 0); + assertEquals(stopInterceptor.getPostStopCounter().get(), 0); + // start the bus + messageBus.start(); + // check the state of the interceptors + assertEquals(startInterceptor.getPreStartCounter().get(), 1); + assertEquals(startInterceptor.getPostStartCounter().get(), 1); + assertEquals(stopInterceptor.getPreStopCounter().get(), 0); + assertEquals(stopInterceptor.getPostStopCounter().get(), 0); + //stop the bus + messageBus.stop(); + //check the state of the interceptors + assertEquals(startInterceptor.getPreStartCounter().get(), 1); + assertEquals(startInterceptor.getPostStartCounter().get(), 1); + assertEquals(stopInterceptor.getPreStopCounter().get(), 1); + assertEquals(stopInterceptor.getPostStopCounter().get(), 1); + } + + + private static class TestMessageBusStartInterceptor extends MessageBusInterceptorAdapter { + + private AtomicInteger preStartCounter = new AtomicInteger(0); + + private AtomicInteger postStartCounter = new AtomicInteger(0); + + + public AtomicInteger getPreStartCounter() { + return preStartCounter; + } + + public AtomicInteger getPostStartCounter() { + return postStartCounter; + } + + @Override + public void preStart(MessageBus bus) { + this.preStartCounter.incrementAndGet(); + assertTrue(!bus.isRunning()); + } + + @Override + public void postStart(MessageBus bus) { + this.postStartCounter.incrementAndGet(); + assertTrue(bus.isRunning()); + } + + } + + private static class TestMessageBusStopInterceptor extends MessageBusInterceptorAdapter { + + private AtomicInteger preStopCounter = new AtomicInteger(0); + + private AtomicInteger postStopCounter = new AtomicInteger(0); + + + public AtomicInteger getPreStopCounter() { + return preStopCounter; + } + + public AtomicInteger getPostStopCounter() { + return postStopCounter; + } + + @Override + public void preStop(MessageBus bus) { + this.preStopCounter.incrementAndGet(); + assertTrue(bus.isRunning()); + } + + @Override + public void postStop(MessageBus bus) { + this.postStopCounter.incrementAndGet(); + assertTrue(!bus.isRunning()); + } + + } + +}