From f4ccde62578fdaa42bf4965f3d60ec0e2cda3159 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Tue, 11 Nov 2008 20:11:21 +0000 Subject: [PATCH] Message Endpoints and the SimpleTaskScheduler now manage their own lifecycles. The ApplicationContextMessageBus is no longer necessary (part of INT-462). The MessagePublishingErrorHandler now detects the default error channel within the beanFactory if necessary (INT-464). --- ...ApplicationEventInboundChannelAdapter.java | 11 +- ...cationEventInboundChannelAdapterTests.java | 10 +- .../HttpInvokerInboundGateway.java | 6 +- .../integration/jms/JmsInboundGateway.java | 15 +- .../JmsInboundChannelAdapterParserTests.java | 5 +- .../jmsGatewayWithContainerSettings.xml | 2 - ...msGatewaysWithExtractPayloadAttributes.xml | 2 - .../jmsOutboundGatewayWithConverter.xml | 2 - .../mail/ImapIdleChannelAdapter.java | 45 ++--- .../config/pollingMailSourceParserTests.xml | 2 +- .../integration/rmi/RmiInboundGateway.java | 3 +- ...nsoleInboundChannelAdapterParserTests.java | 4 +- ...soleOutboundChannelAdapterParserTests.java | 4 +- ...onsoleInboundChannelAdapterParserTests.xml | 4 +- .../bus/ApplicationContextMessageBus.java | 172 +---------------- .../MessagePublishingErrorHandler.java | 5 + .../config/ConsumerEndpointFactoryBean.java | 44 +++-- ...AbstractMethodAnnotationPostProcessor.java | 10 - ...ChannelAdapterAnnotationPostProcessor.java | 15 ++ .../MessagingAnnotationPostProcessor.java | 35 +++- .../xml/AbstractConsumerEndpointParser.java | 1 + ...actPollingInboundChannelAdapterParser.java | 5 + .../config/xml/IntegrationNamespaceUtils.java | 36 ++++ .../config/xml/MessageBusParser.java | 77 ++------ .../config/xml/spring-integration-1.0.xsd | 3 +- .../context/IntegrationContextUtils.java | 78 ++++++++ .../endpoint/AbstractEndpoint.java | 47 ++++- .../endpoint/AbstractPollingEndpoint.java | 58 ++---- .../endpoint/EventDrivenConsumer.java | 33 +--- .../endpoint/MessageProducerSupport.java | 6 +- .../endpoint/SourcePollingChannelAdapter.java | 5 +- .../gateway/AbstractMessagingGateway.java | 41 ++-- .../gateway/GatewayProxyFactoryBean.java | 58 +++--- .../scheduling/SimpleTaskScheduler.java | 84 ++++----- .../integration/util/LifecycleSupport.java | 142 ++++++++++++++ .../ApplicationContextMessageBusTests.java | 121 ++++-------- .../bus/DirectChannelSubscriptionTests.java | 51 +++-- .../integration/bus/MessageBusEventTests.java | 100 ---------- .../integration/bus/messageBusTests.xml | 10 +- .../bus/multipleMessageBusBeans.xml | 11 -- .../channel/MessageChannelTemplateTests.java | 21 +-- .../ChannelAdapterParserTests-context.xml | 4 +- .../config/ChannelAdapterParserTests.java | 31 +-- .../config/MessageBusParserTests.java | 83 -------- ...ActivatorAnnotationPostProcessorTests.java | 15 +- .../AnnotatedEndpointActivationTests.java | 14 +- ...MessagingAnnotationPostProcessorTests.java | 177 +++++------------- .../RouterAnnotationPostProcessorTests.java | 20 +- .../SplitterAnnotationPostProcessorTests.java | 21 +-- .../gateway/SimpleMessagingGatewayTests.java | 2 + .../MethodInvokingMessageHandlerTests.java | 16 +- .../integration/util/TestUtils.java | 68 +++++++ 52 files changed, 759 insertions(+), 1076 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java delete mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java delete mode 100644 org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml diff --git a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java index 50557ae6f8..4c1cc4cb9e 100644 --- a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java +++ b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java @@ -50,7 +50,8 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor } } - public void onApplicationEvent(ApplicationEvent event) { + @Override + protected void onEvent(ApplicationEvent event) { if (CollectionUtils.isEmpty(this.eventTypes)) { this.sendEventAsMessage(event); return; @@ -67,4 +68,12 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor return this.sendMessage(MessageBuilder.withPayload(event).build()); } + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + } diff --git a/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java b/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java index 8a78f2b8ae..89b8a6e331 100644 --- a/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java +++ b/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java @@ -30,8 +30,6 @@ import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.context.event.ContextStartedEvent; import org.springframework.context.event.ContextStoppedEvent; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.integration.bus.MessageBusStartedEvent; -import org.springframework.integration.bus.MessageBusStoppedEvent; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.Message; @@ -76,26 +74,20 @@ public class ApplicationEventInboundChannelAdapterTests { } @Test - public void messageBusAndApplicationContextEvents() { + public void applicationContextEvents() { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "applicationEventInboundChannelAdapterTests.xml", this.getClass()); PollableChannel channel = (PollableChannel) context.getBean("channel"); - Message busStartedEventMessage = channel.receive(0); Message contextRefreshedEventMessage = channel.receive(0); - assertNotNull(busStartedEventMessage); assertNotNull(contextRefreshedEventMessage); - assertEquals(MessageBusStartedEvent.class, busStartedEventMessage.getPayload().getClass()); assertEquals(ContextRefreshedEvent.class, contextRefreshedEventMessage.getPayload().getClass()); context.start(); Message startedEventMessage = channel.receive(0); assertNotNull(startedEventMessage); assertEquals(ContextStartedEvent.class, startedEventMessage.getPayload().getClass()); context.stop(); - Message busStoppedEventMessage = channel.receive(0); Message contextStoppedEventMessage = channel.receive(0); - assertNotNull(busStoppedEventMessage); assertNotNull(contextStoppedEventMessage); - assertEquals(MessageBusStoppedEvent.class, busStoppedEventMessage.getPayload().getClass()); assertEquals(ContextStoppedEvent.class, contextStoppedEventMessage.getPayload().getClass()); context.close(); Message closedEventMessage = channel.receive(0); diff --git a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java index 6b9328a48b..432cdb2500 100644 --- a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java +++ b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java @@ -22,7 +22,6 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.adapter.RemoteMessageHandler; import org.springframework.integration.adapter.RemotingInboundGatewaySupport; import org.springframework.integration.core.MessagingException; @@ -59,12 +58,13 @@ import org.springframework.web.HttpRequestHandler; * * @author Mark Fisher */ -public class HttpInvokerInboundGateway extends RemotingInboundGatewaySupport implements HttpRequestHandler, InitializingBean { +public class HttpInvokerInboundGateway extends RemotingInboundGatewaySupport implements HttpRequestHandler { private volatile HttpInvokerServiceExporter exporter; - public void afterPropertiesSet() { + @Override + protected void onInit() { HttpInvokerServiceExporter exporter = new HttpInvokerServiceExporter(); exporter.setService(this); exporter.setServiceInterface(RemoteMessageHandler.class); diff --git a/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java b/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java index e22e7d8a27..9b21fa5917 100644 --- a/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java +++ b/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java @@ -23,7 +23,6 @@ import javax.jms.MessageProducer; import javax.jms.Session; import org.springframework.beans.factory.DisposableBean; -import org.springframework.context.Lifecycle; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.core.Message; import org.springframework.integration.gateway.SimpleMessagingGateway; @@ -41,7 +40,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class JmsInboundGateway extends SimpleMessagingGateway implements Lifecycle, DisposableBean { +public class JmsInboundGateway extends SimpleMessagingGateway implements DisposableBean { private volatile AbstractMessageListenerContainer container; @@ -190,19 +189,19 @@ public class JmsInboundGateway extends SimpleMessagingGateway implements Lifecyc // Lifecycle implementation - public boolean isRunning() { - return (this.container != null && this.container.isRunning()); - } - - public void start() { + @Override + protected void doStart() { this.initialize(); this.container.start(); + super.doStart(); } - public void stop() { + @Override + protected void doStop() { if (this.container != null) { this.container.stop(); } + super.doStop(); } // DisposableBean implementation diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java index 9878db9da0..c113931550 100644 --- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java +++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java @@ -81,8 +81,9 @@ public class JmsInboundChannelAdapterParserTests { try { new ClassPathXmlApplicationContext("jmsInboundWithDestinationOnly.xml", this.getClass()); } - catch (RuntimeException e) { - assertEquals(NoSuchBeanDefinitionException.class, e.getCause().getClass()); + catch (BeanCreationException e) { + Throwable rootCause = e.getRootCause(); + assertEquals(NoSuchBeanDefinitionException.class, rootCause.getClass()); throw e; } } diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml index a5895c10ce..7c04cfe1f5 100644 --- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml +++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml @@ -10,8 +10,6 @@ http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd"> - - diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml index 34aec969a7..4b81321099 100644 --- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml +++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml @@ -10,8 +10,6 @@ http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd"> - - diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml index 40c13d5505..0c04fc0422 100644 --- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml +++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml @@ -10,8 +10,6 @@ http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd"> - - diff --git a/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java b/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java index 2859336e9c..2ac616014f 100755 --- a/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java +++ b/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java @@ -20,7 +20,6 @@ import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; -import org.springframework.context.Lifecycle; import org.springframework.core.task.SimpleAsyncTaskExecutor; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.endpoint.MessageProducerSupport; @@ -37,16 +36,12 @@ import org.springframework.util.Assert; * @author Arjen Poutsma * @author Mark Fisher */ -public class ImapIdleChannelAdapter extends MessageProducerSupport implements Lifecycle { +public class ImapIdleChannelAdapter extends MessageProducerSupport { private final IdleTask idleTask = new IdleTask(); private volatile TaskExecutor taskExecutor; - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - private final ImapMailReceiver mailReceiver; @@ -70,33 +65,21 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Li * Lifecycle implementation */ - public boolean isRunning() { - return this.running; + @Override // guarded by super#lifecycleLock + protected void doStart() { + if (this.taskExecutor == null) { + if (logger.isInfoEnabled()) { + logger.info("No TaskExecutor has been provided, will use a [" + + SimpleAsyncTaskExecutor.class + "] as the default."); + } + this.taskExecutor = new SimpleAsyncTaskExecutor(); + } + this.taskExecutor.execute(this.idleTask); } - public void start() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - if (this.taskExecutor == null) { - if (logger.isInfoEnabled()) { - logger.info("No TaskExecutor has been provided, will use a [" - + SimpleAsyncTaskExecutor.class + "] as the default."); - } - this.taskExecutor = new SimpleAsyncTaskExecutor(); - } - this.taskExecutor.execute(this.idleTask); - } - this.running = true; - } - } - - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - this.idleTask.interrupt(); - } - this.running = false; - } + @Override // guarded by super#lifecycleLock + protected void doStop() { + this.idleTask.interrupt(); } diff --git a/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml b/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml index 0b96d28d40..31b23ef20f 100644 --- a/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml +++ b/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml @@ -10,6 +10,6 @@ http://www.springframework.org/schema/integration/mail http://www.springframework.org/schema/integration/mail/spring-integration-mail-1.0.xsd"> - + diff --git a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java index 16f1c3de1f..e52a837c5e 100644 --- a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java +++ b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java @@ -72,7 +72,8 @@ public class RmiInboundGateway extends RemotingInboundGatewaySupport implements this.remoteInvocationExecutor = remoteInvocationExecutor; } - public void afterPropertiesSet() throws RemoteException { + @Override + protected void onInit() throws RemoteException { RmiServiceExporter exporter = new RmiServiceExporter(); if (this.registryHost != null) { exporter.setRegistryHost(this.registryHost); diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java index af009ef68b..1611bda56e 100644 --- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java +++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java @@ -97,9 +97,7 @@ public class ConsoleInboundChannelAdapterParserTests { catch (BeanCreationException e) { beanCreationException = e; } - Throwable parentCause = beanCreationException.getCause().getCause(); - assertEquals(IllegalArgumentException.class, parentCause.getClass()); - Throwable rootCause = ((IllegalArgumentException) parentCause).getCause(); + Throwable rootCause = beanCreationException.getRootCause(); assertEquals(UnsupportedEncodingException.class, rootCause.getClass()); } diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java index 08645c3014..4ed6050bf5 100644 --- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java +++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java @@ -108,9 +108,7 @@ public class ConsoleOutboundChannelAdapterParserTests { catch (BeanCreationException e) { beanCreationException = e; } - Throwable parentCause = beanCreationException.getCause().getCause(); - assertEquals(IllegalArgumentException.class, parentCause.getClass()); - Throwable rootCause = ((IllegalArgumentException) parentCause).getCause(); + Throwable rootCause = beanCreationException.getRootCause(); assertEquals(UnsupportedEncodingException.class, rootCause.getClass()); } diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml index 46c54a7bb5..238f9cdd66 100644 --- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml +++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml @@ -10,8 +10,8 @@ http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd"> - + - + \ No newline at end of file diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java index a8690fc447..f9640d5b24 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java @@ -16,175 +16,7 @@ package org.springframework.integration.bus; -import java.util.Collection; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.BeansException; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.context.Lifecycle; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.integration.endpoint.MessageEndpoint; -import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.scheduling.TaskSchedulerAware; -import org.springframework.util.Assert; - -/** - * Spring Integration's standard Message Bus implementation. Serves as a - * registry for Messages Endpoints. Manages their lifecycle, activates - * subscriptions, and schedules pollers by delegating to a - * {@link TaskScheduler}. Retrieves MessageChannels from the - * ApplicationContext based on bean name. - * - * @author Mark Fisher - * @author Marius Bogoevici - */ -public class ApplicationContextMessageBus implements ApplicationContextAware, ApplicationListener, Lifecycle, DisposableBean { - - public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"; - - - private final Log logger = LogFactory.getLog(this.getClass()); - - private volatile TaskScheduler taskScheduler; - - private volatile ApplicationContext applicationContext; - - private volatile boolean autoStartup = true; - - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - - - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - Assert.notNull(applicationContext, "'applicationContext' must not be null"); - Assert.state(!(applicationContext.getBeanNamesForType(this.getClass()).length > 1), - "Only one instance of '" + this.getClass().getSimpleName() + "' is allowed per ApplicationContext."); - this.applicationContext = applicationContext; - } - - /** - * Set the {@link TaskScheduler} to use for scheduling message dispatchers. - */ - public void setTaskScheduler(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; - } - - /** - * Set whether to automatically start the bus after initialization. - *

Default is 'true'; set this to 'false' to allow for manual startup - * through the {@link #start()} method. - */ - public void setAutoStartup(boolean autoStartup) { - this.autoStartup = autoStartup; - } - - private Collection getEndpoints() { - GenericBeanFactoryAccessor accessor = new GenericBeanFactoryAccessor(this.applicationContext); - return accessor.getBeansOfType(MessageEndpoint.class).values(); - } - - private void activateEndpoints() { - for (MessageEndpoint endpoint : this.getEndpoints()) { - if (endpoint != null) { - this.activateEndpoint(endpoint); - } - } - } - - private void deactivateEndpoints() { - for (MessageEndpoint endpoint : this.getEndpoints()) { - if (endpoint != null) { - this.deactivateEndpoint(endpoint); - } - } - } - - private void activateEndpoint(MessageEndpoint endpoint) { - Assert.notNull(endpoint, "'endpoint' must not be null"); - if (endpoint instanceof TaskSchedulerAware) { - ((TaskSchedulerAware) endpoint).setTaskScheduler(this.taskScheduler); - } - if (endpoint instanceof Lifecycle) { - ((Lifecycle) endpoint).start(); - } - if (logger.isInfoEnabled()) { - logger.info("activated endpoint '" + endpoint + "'"); - } - } - - private void deactivateEndpoint(MessageEndpoint endpoint) { - Assert.notNull(endpoint, "'endpoint' must not be null"); - if (endpoint instanceof Lifecycle) { - ((Lifecycle) endpoint).stop(); - if (this.logger.isInfoEnabled()) { - logger.info("deactivated endpoint '" + endpoint + "'"); - } - } - } - - // Lifecycle implementation - - public boolean isRunning() { - synchronized (this.lifecycleMonitor) { - return this.running; - } - } - - public void start() { - if (this.running) { - return; - } - Assert.notNull(this.applicationContext, "ApplicationContext must not be null"); - Assert.notNull(this.taskScheduler, "TaskScheduler must not be null"); - synchronized (this.lifecycleMonitor) { - this.activateEndpoints(); - this.taskScheduler.start(); - } - this.running = true; - this.applicationContext.publishEvent(new MessageBusStartedEvent(this)); - if (logger.isInfoEnabled()) { - logger.info("message bus started"); - } - } - - public void stop() { - if (!this.isRunning()) { - return; - } - synchronized (this.lifecycleMonitor) { - this.deactivateEndpoints(); - this.running = false; - this.taskScheduler.stop(); - } - this.applicationContext.publishEvent(new MessageBusStoppedEvent(this)); - if (logger.isInfoEnabled()) { - logger.info("message bus stopped"); - } - } - - // ApplicationListener implementation - - public void onApplicationEvent(ApplicationEvent event) { - if (event instanceof ContextRefreshedEvent && this.autoStartup) { - this.start(); - } - } - - // DisposableBean implementation - - public void destroy() throws Exception { - this.stop(); - if (this.taskScheduler instanceof DisposableBean) { - ((DisposableBean) this.taskScheduler).destroy(); - } - } +// TODO: placeholder only, delete after handling all dependencies +public class ApplicationContextMessageBus { } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java index 4b0ef49903..526a5ebaee 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessagingException; @@ -97,6 +98,10 @@ public class MessagePublishingErrorHandler implements ErrorHandler, BeanFactoryA } private MessageChannel resolveErrorChannel(Message failedMessage) { + if (this.defaultErrorChannel == null && this.channelResolver != null) { + this.defaultErrorChannel = this.channelResolver.resolveChannelName( + IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME); + } if (failedMessage == null || failedMessage.getHeaders().getErrorChannel() == null) { return this.defaultErrorChannel; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 3857b0ce96..77cbc072ad 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -22,11 +22,13 @@ import org.springframework.beans.factory.BeanNameAware; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.config.ConfigurableBeanFactory; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.core.MessageChannel; -import org.springframework.integration.endpoint.MessageEndpoint; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.message.MessageHandler; @@ -39,7 +41,7 @@ import org.springframework.util.Assert; /** * @author Mark Fisher */ -public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, InitializingBean { +public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, InitializingBean, ApplicationListener { private final MessageHandler handler; @@ -61,7 +63,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar private volatile ConfigurableBeanFactory beanFactory; - private volatile MessageEndpoint endpoint; + private volatile AbstractEndpoint endpoint; private volatile boolean initialized; @@ -74,10 +76,6 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar } - public void setBeanName(String beanName) { - this.beanName = beanName; - } - public void setInputChannelName(String inputChannelName) { this.inputChannelName = inputChannelName; } @@ -106,14 +104,22 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar this.transactionDefinition = transactionDefinition; } + public void setBeanName(String beanName) { + this.beanName = beanName; + } + public void setBeanFactory(BeanFactory beanFactory) { Assert.isInstanceOf(ConfigurableBeanFactory.class, beanFactory, "a ConfigurableBeanFactory is required"); this.beanFactory = (ConfigurableBeanFactory) beanFactory; } - public void afterPropertiesSet() { - Assert.hasText(this.inputChannelName, "inputChannelName is required"); + public void afterPropertiesSet() throws Exception { + this.initializeEndpoint(); + } + + public boolean isSingleton() { + return true; } public Object getObject() throws Exception { @@ -125,20 +131,17 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar public Class getObjectType() { if (this.endpoint == null) { - return MessageEndpoint.class; + return AbstractEndpoint.class; } - return endpoint.getClass(); + return this.endpoint.getClass(); } - public boolean isSingleton() { - return true; - } - - private void initializeEndpoint() { + private void initializeEndpoint() throws Exception { synchronized (this.initializationMonitor) { if (this.initialized) { return; } + Assert.hasText(this.inputChannelName, "inputChannelName is required"); Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName), "no such input channel '" + this.inputChannelName + "' for endpoint '" + this.beanName + "'"); MessageChannel channel = (MessageChannel) @@ -166,8 +169,17 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar throw new IllegalArgumentException( "unsupported channel type: [" + channel.getClass() + "]"); } + this.endpoint.setBeanName(this.beanName); + this.endpoint.setBeanFactory(this.beanFactory); + if (this.endpoint instanceof InitializingBean) { + ((InitializingBean) this.endpoint).afterPropertiesSet(); + } this.initialized = true; } } + public void onApplicationEvent(ApplicationEvent event) { + this.endpoint.onApplicationEvent(event); + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java index 7c7059d163..8d2d9bac51 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java @@ -20,7 +20,6 @@ import java.lang.annotation.Annotation; import java.lang.reflect.Method; import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.ListableBeanFactory; import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor; import org.springframework.core.annotation.AnnotationUtils; @@ -64,15 +63,6 @@ public abstract class AbstractMethodAnnotationPostProcessor, MethodAnnotationPostProcessor> postProcessors = new HashMap, MethodAnnotationPostProcessor>(); + private Set listeners = new HashSet(); + public void setBeanFactory(BeanFactory beanFactory) { Assert.isAssignable(ConfigurableListableBeanFactory.class, beanFactory.getClass(), @@ -74,7 +77,6 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean public void afterPropertiesSet() { Assert.notNull(this.beanFactory, "BeanFactory must not be null"); - this.messageBus = (Lifecycle) this.beanFactory.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory)); postProcessors.put(ChannelAdapter.class, new ChannelAdapterAnnotationPostProcessor(this.beanFactory)); postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory)); @@ -108,8 +110,19 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean ((BeanNameAware) result).setBeanName(endpointBeanName); } beanFactory.registerSingleton(endpointBeanName, result); - if (messageBus.isRunning() && result instanceof Lifecycle) { - ((Lifecycle) result).start(); + if (result instanceof BeanFactoryAware) { + ((BeanFactoryAware) result).setBeanFactory(beanFactory); + } + if (result instanceof InitializingBean) { + try { + ((InitializingBean) result).afterPropertiesSet(); + } + catch (Exception e) { + throw new BeanInitializationException("failed to initialize annotated component", e); + } + } + if (result instanceof ApplicationListener) { + listeners.add((ApplicationListener) result); } } } @@ -158,4 +171,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean return name; } + public void onApplicationEvent(ApplicationEvent event) { + for (ApplicationListener listener : listeners) { + listener.onApplicationEvent(event); + } + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java index cf6e654fb2..8fa97b4988 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java @@ -87,6 +87,7 @@ public abstract class AbstractConsumerEndpointParser extends AbstractSingleBeanD @Override protected final void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { + IntegrationNamespaceUtils.registerTaskSchedulerIfNecessary(parserContext.getRegistry()); BeanDefinitionBuilder consumerBuilder = this.parseConsumer(element, parserContext); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, OUTPUT_CHANNEL_ATTRIBUTE); IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, SELECTOR_ATTRIBUTE); diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java index 5caffe7cf8..98e52d242b 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java @@ -23,6 +23,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.integration.util.LifecycleSupport.AutoStartMode; import org.springframework.util.Assert; import org.springframework.util.xml.DomUtils; @@ -52,6 +53,10 @@ public abstract class AbstractPollingInboundChannelAdapterParser extends Abstrac else { adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval())); } + String autoStart = element.getAttribute("auto-startup"); + if ("false".equals(autoStart)) { + adapterBuilder.addPropertyValue("autoStartMode", AutoStartMode.NONE); + } return adapterBuilder.getBeanDefinition(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java index 61f66d1172..cb05d7adb0 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java @@ -23,11 +23,19 @@ import org.w3c.dom.Element; import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; +import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; +import org.springframework.beans.factory.support.BeanDefinitionRegistry; +import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.core.Conventions; +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.MessagePublishingErrorHandler; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.scheduling.CronTrigger; import org.springframework.integration.scheduling.IntervalTrigger; +import org.springframework.integration.scheduling.SimpleTaskScheduler; import org.springframework.integration.scheduling.Trigger; import org.springframework.transaction.support.DefaultTransactionDefinition; import org.springframework.util.Assert; @@ -195,4 +203,32 @@ public abstract class IntegrationNamespaceUtils { targetBuilder.addPropertyValue("transactionDefinition", txDefinition); } + /** + * Register a TaskScheduler in the given BeanDefinitionRegistry if not yet present. + * The bean name for which this is checking is defined by the constant + * {@link IntegrationContextUtils#TASK_SCHEDULER_BEAN_NAME}. + */ + public static synchronized void registerTaskSchedulerIfNecessary(BeanDefinitionRegistry registry) { + if (!registry.containsBeanDefinition(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) { + RootBeanDefinition errorChannelDef = new RootBeanDefinition(QueueChannel.class); + BeanDefinitionHolder errorChannelHolder = new BeanDefinitionHolder( + errorChannelDef, IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME); + BeanDefinitionReaderUtils.registerBeanDefinition(errorChannelHolder, registry); + } + TaskExecutor taskExecutor = null; + if (!registry.containsBeanDefinition(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)) { + taskExecutor = IntegrationContextUtils.createTaskExecutor(2, 100, 0, "integration-main-"); + BeanDefinitionBuilder schedulerBuilder = BeanDefinitionBuilder.genericBeanDefinition(SimpleTaskScheduler.class); + schedulerBuilder.addConstructorArgValue(taskExecutor); + BeanDefinitionBuilder errorHandlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MessagePublishingErrorHandler.class); + errorHandlerBuilder.addPropertyReference("defaultErrorChannel", IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME); + String errorHandlerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( + errorHandlerBuilder.getBeanDefinition(), registry); + schedulerBuilder.addPropertyReference("errorHandler", errorHandlerBeanName); + BeanDefinitionHolder schedulerHolder = new BeanDefinitionHolder( + schedulerBuilder.getBeanDefinition(), IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME); + BeanDefinitionReaderUtils.registerBeanDefinition(schedulerHolder, registry); + } + } + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java index c0ac34f770..232f40e650 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java @@ -17,7 +17,6 @@ package org.springframework.integration.config.xml; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import org.w3c.dom.Element; @@ -27,21 +26,14 @@ import org.springframework.beans.factory.config.BeanDefinitionHolder; import org.springframework.beans.factory.support.AbstractBeanDefinition; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.support.BeanDefinitionReaderUtils; -import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.bus.ApplicationContextMessageBus; -import org.springframework.integration.channel.MessagePublishingErrorHandler; -import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; -import org.springframework.integration.scheduling.SimpleTaskScheduler; -import org.springframework.scheduling.concurrent.CustomizableThreadFactory; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.util.Assert; -import org.springframework.util.StringUtils; +import org.springframework.integration.context.IntegrationContextUtils; /** * Parser for the <message-bus> element of the integration namespace. @@ -51,35 +43,27 @@ import org.springframework.util.StringUtils; */ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { - public static final String MESSAGE_BUS_BEAN_NAME = "internal.MessageBus"; - - public static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; - private static final String MESSAGING_ANNOTATION_POST_PROCESSOR_BEAN_NAME = "internal.MessagingAnnotationPostProcessor"; - private static final String TASK_SCHEDULER_ATTRIBUTE = "task-scheduler"; - private static final String ASYNC_EVENT_MULTICASTER_ATTRIBUTE = "configure-async-event-multicaster"; - @Override - protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) - throws BeanDefinitionStoreException { - Assert.state(!parserContext.getRegistry().containsBeanDefinition(MESSAGE_BUS_BEAN_NAME), - "Only one Message Bus is allowed per ApplicationContext."); - return MESSAGE_BUS_BEAN_NAME; - } - @Override protected Class getBeanClass(Element element) { return ApplicationContextMessageBus.class; } + @Override + protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext) + throws BeanDefinitionStoreException { + return "messageBus"; + } + + @Override protected boolean isEligibleAttribute(String attributeName) { - return !TASK_SCHEDULER_ATTRIBUTE.equals(attributeName) && - !ASYNC_EVENT_MULTICASTER_ATTRIBUTE.equals(attributeName) && + return !ASYNC_EVENT_MULTICASTER_ATTRIBUTE.equals(attributeName) && !"enable-annotations".equals(attributeName) && super.isEligibleAttribute(attributeName); } @@ -87,35 +71,9 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { @Override protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) { super.doParse(element, parserContext, builder); - String taskSchedulerRef = element.getAttribute(TASK_SCHEDULER_ATTRIBUTE); - TaskExecutor taskExecutor= null; - if (!parserContext.getRegistry().containsBeanDefinition(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME)) { - RootBeanDefinition errorChannelDef = new RootBeanDefinition(QueueChannel.class); - BeanDefinitionHolder errorChannelHolder = new BeanDefinitionHolder( - errorChannelDef, ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME); - BeanDefinitionReaderUtils.registerBeanDefinition(errorChannelHolder, parserContext.getRegistry()); - } - if (StringUtils.hasText(taskSchedulerRef)) { - builder.addPropertyReference("taskScheduler", taskSchedulerRef); - } - else { - taskExecutor = this.createTaskExecutor(2, 100, 0, "message-bus-"); - BeanDefinitionBuilder schedulerBuilder = BeanDefinitionBuilder.genericBeanDefinition(SimpleTaskScheduler.class); - schedulerBuilder.addConstructorArgValue(taskExecutor); - BeanDefinitionBuilder errorHandlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MessagePublishingErrorHandler.class); - errorHandlerBuilder.addPropertyReference("defaultErrorChannel", ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME); - String errorHandlerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName( - errorHandlerBuilder.getBeanDefinition(), parserContext.getRegistry()); - schedulerBuilder.addPropertyReference("errorHandler", errorHandlerBeanName); - BeanDefinitionHolder schedulerHolder = new BeanDefinitionHolder( - schedulerBuilder.getBeanDefinition(), TASK_SCHEDULER_BEAN_NAME); - BeanDefinitionReaderUtils.registerBeanDefinition(schedulerHolder, parserContext.getRegistry()); - builder.addPropertyReference("taskScheduler", TASK_SCHEDULER_BEAN_NAME); - } + IntegrationNamespaceUtils.registerTaskSchedulerIfNecessary(parserContext.getRegistry()); if ("true".equals(element.getAttribute(ASYNC_EVENT_MULTICASTER_ATTRIBUTE).toLowerCase())) { - if (taskExecutor == null) { - taskExecutor = this.createTaskExecutor(1, 10, 0, "event-multicaster-"); - } + TaskExecutor taskExecutor = IntegrationContextUtils.createTaskExecutor(1, 10, 0, "event-multicaster-"); BeanDefinitionBuilder eventMulticasterBuilder = BeanDefinitionBuilder.genericBeanDefinition( SimpleApplicationEventMulticaster.class); eventMulticasterBuilder.addPropertyValue("taskExecutor", taskExecutor); @@ -127,19 +85,6 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser { this.addPostProcessors(element, parserContext); } - private TaskExecutor createTaskExecutor(int coreSize, int maxSize, int queueCapacity, String threadPrefix) { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(coreSize); - executor.setMaxPoolSize(maxSize); - executor.setQueueCapacity(queueCapacity); - if (StringUtils.hasText(threadPrefix)) { - executor.setThreadFactory(new CustomizableThreadFactory(threadPrefix)); - } - executor.setRejectedExecutionHandler(new CallerRunsPolicy()); - executor.afterPropertiesSet(); - return executor; - } - /** * Adds extra post-processors to the context, to inject the objects configured by the MessageBus */ diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd index 216f8d3272..fd11a928f1 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd @@ -24,8 +24,6 @@ - - @@ -175,6 +173,7 @@ + diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java new file mode 100644 index 0000000000..1a20798e95 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java @@ -0,0 +1,78 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.context; + +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +/** + * @author Mark Fisher + */ +public abstract class IntegrationContextUtils { + + public static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler"; + + public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"; + + + public static MessageChannel getErrorChannel(BeanFactory beanFactory) { + return getBeanOfType(beanFactory, ERROR_CHANNEL_BEAN_NAME, MessageChannel.class); + } + + public static TaskScheduler getTaskScheduler(BeanFactory beanFactory) { + return getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class); + } + + public static TaskScheduler getRequiredTaskScheduler(BeanFactory beanFactory) { + TaskScheduler taskScheduler = getTaskScheduler(beanFactory); + Assert.state(taskScheduler != null, "No such bean '" + TASK_SCHEDULER_BEAN_NAME + "'"); + return taskScheduler; + } + + @SuppressWarnings("unchecked") + private static T getBeanOfType(BeanFactory beanFactory, String beanName, Class type) { + if (!beanFactory.containsBean(beanName)) { + return null; + } + Object bean = beanFactory.getBean(beanName); + Assert.state(type.isAssignableFrom(bean.getClass()), "incorrect type for bean '" + beanName + + "' expected [" + type + "], but actual type is [" + bean.getClass() + "]."); + return (T) bean; + } + + public static TaskExecutor createTaskExecutor(int coreSize, int maxSize, int queueCapacity, String threadPrefix) { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(coreSize); + executor.setMaxPoolSize(maxSize); + executor.setQueueCapacity(queueCapacity); + if (StringUtils.hasText(threadPrefix)) { + executor.setThreadFactory(new CustomizableThreadFactory(threadPrefix)); + } + executor.setRejectedExecutionHandler(new CallerRunsPolicy()); + executor.afterPropertiesSet(); + return executor; + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 820a3a2cc6..415730ebc3 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,29 +16,58 @@ package org.springframework.integration.endpoint; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.BeanNameAware; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.integration.util.LifecycleSupport; +import org.springframework.util.Assert; /** * The base class for Message Endpoint implementations. * * @author Mark Fisher */ -public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware { +public abstract class AbstractEndpoint extends LifecycleSupport implements MessageEndpoint, BeanNameAware, BeanFactoryAware { - protected final Log logger = LogFactory.getLog(this.getClass()); + private volatile String beanName; - private volatile String name; + private volatile BeanFactory beanFactory; + + private volatile TaskScheduler taskScheduler; - public void setBeanName(String name) { - this.name = name; + public void setBeanName(String beanName) { + this.beanName = beanName; } + // TODO: make this final (see TODO in GatewayProxyFactoryBean) + public void setBeanFactory(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "beanFactory must not be null"); + this.beanFactory = beanFactory; + TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory); + if (taskScheduler != null) { + this.setTaskScheduler(taskScheduler); + } + } + + protected BeanFactory getBeanFactory() { + return this.beanFactory; + } + + public void setTaskScheduler(TaskScheduler taskScheduler) { + Assert.notNull(taskScheduler, "taskScheduler must not be null"); + this.taskScheduler = taskScheduler; + } + + protected TaskScheduler getTaskScheduler() { + return this.taskScheduler; + } + + @Override public String toString() { - return (this.name != null) ? this.name : super.toString(); + return (this.beanName != null) ? this.beanName : super.toString(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index 44fa623b5c..a20efe7631 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -25,11 +25,8 @@ import org.aopalliance.aop.Advice; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.Lifecycle; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.scheduling.IntervalTrigger; -import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.integration.scheduling.Trigger; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; @@ -43,8 +40,7 @@ import org.springframework.util.ClassUtils; /** * @author Mark Fisher */ -public abstract class AbstractPollingEndpoint extends AbstractEndpoint - implements TaskSchedulerAware, Lifecycle, InitializingBean, BeanClassLoaderAware { +public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements InitializingBean, BeanClassLoaderAware { public static final int MAX_MESSAGES_UNBOUNDED = -1; @@ -65,15 +61,13 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); - private volatile TaskScheduler taskScheduler; - private volatile ScheduledFuture runningTask; private volatile Runnable poller; private volatile boolean initialized; - private final Object lifecycleMonitor = new Object(); + private final Object initializationMonitor = new Object(); public void setTrigger(Trigger trigger) { @@ -93,10 +87,6 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint this.maxMessagesPerPoll = maxMessagesPerPoll; } - public void setTaskScheduler(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; - } - public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } @@ -126,13 +116,14 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint private TransactionTemplate getTransactionTemplate() { if (!this.initialized) { - this.afterPropertiesSet(); + this.onInit(); } return this.transactionTemplate; } - public void afterPropertiesSet() { - synchronized (this.lifecycleMonitor) { + @Override + protected void onInit() { + synchronized (this.initializationMonitor) { if (this.initialized) { return; } @@ -163,35 +154,24 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint } - // Lifecycle implementation + // LifecycleSupport implementation - public boolean isRunning() { - synchronized (this.lifecycleMonitor) { - return this.runningTask != null; + @Override // guarded by super#lifecycleLock + protected void doStart() { + if (!this.initialized) { + this.onInit(); } + Assert.state(this.getTaskScheduler() != null, + "unable to start polling, no taskScheduler available"); + this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger); } - public void start() { - synchronized (this.lifecycleMonitor) { - if (!this.initialized) { - this.afterPropertiesSet(); - } - if (this.isRunning()) { - return; - } - Assert.state(this.taskScheduler != null, - "unable to start polling, no taskScheduler available"); - this.runningTask = this.taskScheduler.schedule(this.poller, this.trigger); - } - } - - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.runningTask != null) { - this.runningTask.cancel(true); - } - this.runningTask = null; + @Override // guarded by super#lifecycleLock + protected void doStop() { + if (this.runningTask != null) { + this.runningTask.cancel(true); } + this.runningTask = null; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java index d5fb69c36c..4f4d19dbae 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java @@ -16,7 +16,6 @@ package org.springframework.integration.endpoint; -import org.springframework.context.Lifecycle; import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.message.MessageHandler; import org.springframework.util.Assert; @@ -27,16 +26,12 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public class EventDrivenConsumer extends AbstractEndpoint implements Lifecycle { +public class EventDrivenConsumer extends AbstractEndpoint { private final SubscribableChannel inputChannel; private final MessageHandler handler; - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler handler) { Assert.notNull(inputChannel, "inputChannel must not be null"); @@ -46,28 +41,14 @@ public class EventDrivenConsumer extends AbstractEndpoint implements Lifecycle { } - public boolean isRunning() { - synchronized (this.lifecycleMonitor) { - return this.running; - } + @Override // guarded by super#lifecycleLock + protected void doStart() { + this.inputChannel.subscribe(this.handler); } - public void start() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - this.inputChannel.subscribe(this.handler); - this.running = true; - } - } - } - - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - this.inputChannel.unsubscribe(this.handler); - this.running = false; - } - } + @Override // guarded by super#lifecycleLock + protected void doStop() { + this.inputChannel.unsubscribe(this.handler); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java index 0d70a98a04..4c7dd24abd 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java @@ -16,7 +16,6 @@ package org.springframework.integration.endpoint; -import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.channel.MessageChannelTemplate; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; @@ -28,7 +27,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public abstract class MessageProducerSupport extends AbstractEndpoint implements InitializingBean { +public abstract class MessageProducerSupport extends AbstractEndpoint { private volatile MessageChannel outputChannel; @@ -43,7 +42,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements this.channelTemplate.setSendTimeout(sendTimeout); } - public void afterPropertiesSet() { + @Override + protected void onInit() { Assert.notNull(this.outputChannel, "outputChannel is required"); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index ae6c8adb21..a668ed7b7a 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -59,15 +59,16 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint { this.channelTemplate.setSendTimeout(sendTimeout); } - public void afterPropertiesSet() { + @Override + protected void onInit() { Assert.notNull(this.source, "source must not be null"); Assert.notNull(this.outputChannel, "outputChannel must not be null"); - super.afterPropertiesSet(); if (this.maxMessagesPerPoll < 0) { // the default is 1 since a source might return // a non-null value every time it is invoked this.setMaxMessagesPerPoll(1); } + super.onInit(); } @Override diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java index 608aaa38f6..44fe7abb82 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java @@ -23,6 +23,7 @@ import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessagingException; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.MessageEndpoint; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -31,8 +32,6 @@ import org.springframework.integration.handler.ReplyMessageHolder; import org.springframework.integration.message.ErrorMessage; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.message.MessageDeliveryException; -import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.scheduling.TaskSchedulerAware; import org.springframework.util.Assert; /** @@ -43,7 +42,7 @@ import org.springframework.util.Assert; * * @author Mark Fisher */ -public abstract class AbstractMessagingGateway implements MessagingGateway, MessageEndpoint, TaskSchedulerAware, Lifecycle { +public abstract class AbstractMessagingGateway extends AbstractEndpoint implements MessagingGateway { private volatile MessageChannel requestChannel; @@ -53,16 +52,10 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess private volatile boolean shouldThrowErrors = true; - private volatile TaskScheduler taskScheduler; - private volatile MessageEndpoint replyMessageCorrelator; private final Object replyMessageCorrelatorMonitor = new Object(); - private volatile boolean running; - - private final Object lifecycleMonitor = new Object(); - /** * Set the request channel. @@ -113,10 +106,6 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess this.shouldThrowErrors = shouldThrowErrors; } - public void setTaskScheduler(TaskScheduler taskScheduler) { - this.taskScheduler = taskScheduler; - } - public void send(Object object) { Assert.state(this.requestChannel != null, "send is not supported, because no request channel has been configured"); @@ -190,7 +179,7 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess else if (this.replyChannel instanceof PollableChannel) { PollingConsumer endpoint = new PollingConsumer( (PollableChannel) this.replyChannel, handler); - endpoint.setTaskScheduler(this.taskScheduler); + endpoint.setBeanFactory(this.getBeanFactory()); endpoint.afterPropertiesSet(); correlator = endpoint; } @@ -201,25 +190,17 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess } } - public boolean isRunning() { - return this.running; - } - - public void start() { - synchronized (this.lifecycleMonitor) { - if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) { - ((Lifecycle) this.replyMessageCorrelator).start(); - } - this.running = true; + @Override // guarded by super#lifecycleLock + protected void doStart() { + if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) { + ((Lifecycle) this.replyMessageCorrelator).start(); } } - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) { - ((Lifecycle) this.replyMessageCorrelator).stop(); - } - this.running = false; + @Override // guarded by super#lifecycleLock + protected void doStop() { + if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) { + ((Lifecycle) this.replyMessageCorrelator).stop(); } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java index cf54263534..097c01fab2 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java @@ -37,11 +37,10 @@ import org.springframework.integration.annotation.Gateway; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.message.MethodParameterMessageMapper; -import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.StringUtils; @@ -52,8 +51,8 @@ import org.springframework.util.StringUtils; * * @author Mark Fisher */ -public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, BeanClassLoaderAware, BeanFactoryAware, - InitializingBean, Lifecycle { +public class GatewayProxyFactoryBean extends AbstractEndpoint implements FactoryBean, MethodInterceptor, BeanClassLoaderAware, BeanFactoryAware, + InitializingBean { private volatile Class serviceInterface; @@ -73,18 +72,12 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, private final Map gatewayMap = new HashMap(); - private volatile TaskScheduler taskScheduler; - private volatile ChannelResolver channelResolver; private volatile boolean initialized; - private volatile boolean running; - private final Object initializationMonitor = new Object(); - private final Object lifecycleMonitor = new Object(); - public void setServiceInterface(Class serviceInterface) { if (serviceInterface != null && !serviceInterface.isInterface()) { @@ -144,12 +137,14 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, this.beanClassLoader = beanClassLoader; } + @Override // TODO: remove this and move channelResolver to parent class public void setBeanFactory(BeanFactory beanFactory) { - this.taskScheduler = (TaskScheduler) beanFactory.getBean(MessageBusParser.TASK_SCHEDULER_BEAN_NAME); + super.setBeanFactory(beanFactory); this.channelResolver = new BeanFactoryChannelResolver(beanFactory); } - public void afterPropertiesSet() throws Exception { + @Override + protected void onInit() throws Exception { synchronized (this.initializationMonitor) { if (this.initialized) { return; @@ -226,7 +221,9 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, private MessagingGateway createGatewayForMethod(Method method) throws Exception { SimpleMessagingGateway gateway = new SimpleMessagingGateway( new MethodParameterMessageMapper(method), new SimpleMessageMapper()); - gateway.setTaskScheduler(this.taskScheduler); + if (this.getTaskScheduler() != null) { + gateway.setTaskScheduler(this.getTaskScheduler()); + } Gateway gatewayAnnotation = method.getAnnotation(Gateway.class); MessageChannel requestChannel = this.defaultRequestChannel; MessageChannel replyChannel = this.defaultReplyChannel; @@ -251,37 +248,28 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, gateway.setReplyChannel(replyChannel); gateway.setRequestTimeout(requestTimeout); gateway.setReplyTimeout(replyTimeout); + if (this.getBeanFactory() != null) { + gateway.setBeanFactory(this.getBeanFactory()); + } return gateway; } // Lifecycle implementation - public boolean isRunning() { - return this.running; - } - - public void start() { - synchronized (this.lifecycleMonitor) { - if (!this.running) { - for (MessagingGateway gateway : this.gatewayMap.values()) { - if (gateway instanceof Lifecycle) { - ((Lifecycle) gateway).start(); - } - } - this.running = true; + @Override // guarded by super#lifecycleLock + protected void doStart() { + for (MessagingGateway gateway : this.gatewayMap.values()) { + if (gateway instanceof Lifecycle) { + ((Lifecycle) gateway).start(); } } } - public void stop() { - synchronized (this.lifecycleMonitor) { - if (this.running) { - for (MessagingGateway gateway : this.gatewayMap.values()) { - if (gateway instanceof Lifecycle) { - ((Lifecycle) gateway).stop(); - } - } - this.running = false; + @Override // guarded by super#lifecycleLock + protected void doStop() { + for (MessagingGateway gateway : this.gatewayMap.values()) { + if (gateway instanceof Lifecycle) { + ((Lifecycle) gateway).stop(); } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java index fbeeacc4be..e6d5f3c406 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java @@ -26,14 +26,18 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; - +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.DisposableBean; import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.channel.BeanFactoryChannelResolver; +import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.util.ErrorHandler; +import org.springframework.integration.util.LifecycleSupport; import org.springframework.scheduling.SchedulingException; import org.springframework.util.Assert; @@ -43,7 +47,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @author Marius Bogoevici */ -public class SimpleTaskScheduler implements TaskScheduler, DisposableBean { +public class SimpleTaskScheduler extends LifecycleSupport implements TaskScheduler, BeanFactoryAware, DisposableBean { private final Log logger = LogFactory.getLog(this.getClass()); @@ -57,14 +61,11 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean { private final Set> executingTasks = Collections.synchronizedSet(new TreeSet>()); - private volatile boolean running; - - private final ReentrantLock lifecycleLock = new ReentrantLock(); - public SimpleTaskScheduler(TaskExecutor executor) { Assert.notNull(executor, "executor must not be null"); this.executor = executor; + this.setAutoStartMode(AutoStartMode.ON_CONTEXT_REFRESH); } @@ -72,7 +73,14 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean { this.errorHandler = errorHandler; } + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + if (this.errorHandler == null) { + this.errorHandler = new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(beanFactory)); + } + } + public final ScheduledFuture schedule(Runnable task, Trigger trigger) { + Assert.notNull(task, "task must not be null"); TriggeredTask triggeredTask = new TriggeredTask(task, trigger); return this.schedule(triggeredTask, null, null); } @@ -87,56 +95,28 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean { } - // Lifecycle implementation + // LifecycleSupport implementation - public boolean isRunning() { - this.lifecycleLock.lock(); - try { - return this.running; - } - finally { - this.lifecycleLock.unlock(); - } + @Override // guarded by super#lifecycleLock + protected void doStart() { + this.executor.execute(this.schedulerTask = new SchedulerTask()); } - public void start() { - this.lifecycleLock.lock(); - try { - if (this.running) { - return; - } - this.running = true; - this.executor.execute(this.schedulerTask = new SchedulerTask()); + @Override // guarded by super#lifecycleLock + protected void doStop() { + this.schedulerTask.deactivate(); + Thread executingThread = this.schedulerTask.executingThread.get(); + if (executingThread != null) { + executingThread.interrupt(); } - finally { - this.lifecycleLock.unlock(); - } - } - - public void stop() { - this.lifecycleLock.lock(); - try { - if (!this.running) { - return; + this.scheduledTasks.clear(); + synchronized (this.executingTasks) { + for (TriggeredTask task : this.executingTasks) { + task.cancel(true); } - this.running = false; - this.schedulerTask.deactivate(); - Thread executingThread = this.schedulerTask.executingThread.get(); - if (executingThread != null) { - executingThread.interrupt(); - } - this.scheduledTasks.clear(); - synchronized (this.executingTasks) { - for (TriggeredTask task : this.executingTasks) { - task.cancel(true); - } - this.executingTasks.clear(); - } - this.schedulerTask = null; - } - finally { - this.lifecycleLock.unlock(); + this.executingTasks.clear(); } + this.schedulerTask = null; } public void destroy() throws Exception { @@ -236,7 +216,7 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean { public long getDelay(TimeUnit unit) { long now = new Date().getTime(); - long scheduled = this.scheduledTime.getTime(); + long scheduled = (this.scheduledTime != null) ? this.scheduledTime.getTime() : now; return (scheduled > now) ? unit.convert(scheduled - now, TimeUnit.MILLISECONDS) : 0; } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java new file mode 100644 index 0000000000..64406d3305 --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java @@ -0,0 +1,142 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.util; + +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.Lifecycle; +import org.springframework.context.event.ContextRefreshedEvent; + +/** + * A convenience base class for Lifecycle components that supports an + * "auto-startup" mode property. Depending on the mode, the component can + * be started either upon initialization, upon receiving the + * {@link ContextRefreshedEvent}, or may require an explicit start invocation. + * The timing of the startup is determined by the value of {@link #autoStartMode}. + * The default value is {@link AutoStartMode#ON_INIT}. To require explicit startup, + * set the mode to {@link AutoStartMode#NONE} using the + * {@link #setAutoStartMode(AutoStartMode)} method. + * + * @author Mark Fisher + */ +public abstract class LifecycleSupport implements Lifecycle, InitializingBean, ApplicationListener { + + public static enum AutoStartMode { ON_INIT, ON_CONTEXT_REFRESH, NONE } + + + protected final Log logger = LogFactory.getLog(this.getClass()); + + private volatile AutoStartMode autoStartMode = AutoStartMode.ON_INIT; + + private volatile boolean running; + + private final ReentrantLock lifecycleLock = new ReentrantLock(); + + + public void setAutoStartMode(AutoStartMode autoStartMode) { + this.autoStartMode = (autoStartMode != null) ? autoStartMode : AutoStartMode.NONE; + } + + public final void afterPropertiesSet() { + try { + this.onInit(); + if (this.autoStartMode == AutoStartMode.ON_INIT) { + this.start(); + } + } + catch (Exception e) { + throw new BeanInitializationException("failed to initialize", e); + } + } + + public final void onApplicationEvent(ApplicationEvent event) { + this.onEvent(event); + if (event instanceof ContextRefreshedEvent && this.autoStartMode == AutoStartMode.ON_CONTEXT_REFRESH) { + this.start(); + } + } + + // Lifecycle implementation + + public final boolean isRunning() { + this.lifecycleLock.lock(); + try { + return this.running; + } + finally { + this.lifecycleLock.unlock(); + } + } + + public final void start() { + this.lifecycleLock.lock(); + try { + if (!this.running) { + this.doStart(); + this.running = true; + if (logger.isInfoEnabled()) { + logger.info("started " + this); + } + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + public final void stop() { + this.lifecycleLock.lock(); + try { + if (this.running) { + this.doStop(); + this.running = false; + if (logger.isInfoEnabled()) { + logger.info("stopped " + this); + } + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + protected void onInit() throws Exception { + } + + protected void onEvent(ApplicationEvent event) { + } + + /** + * Subclasses must implement this method with the start behavior. + * This method will be invoked while holding the {@link #lifecycleLock}. + */ + protected abstract void doStart(); + + /** + * Subclasses must implement this method with the stop behavior. + * This method will be invoked while holding the {@link #lifecycleLock}. + */ + protected abstract void doStop(); + +} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java index d2cc9c4f53..02c4ef934c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java @@ -26,21 +26,18 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.springframework.beans.factory.BeanCreationException; -import org.springframework.context.Lifecycle; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.PublishSubscribeChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.config.xml.MessageBusParser; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Message; +import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; -import org.springframework.integration.endpoint.EventDrivenConsumer; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.ReplyMessageHolder; import org.springframework.integration.message.ErrorMessage; @@ -49,8 +46,8 @@ import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.MessageSource; import org.springframework.integration.message.StringMessage; import org.springframework.integration.scheduling.IntervalTrigger; -import org.springframework.integration.scheduling.SimpleTaskScheduler; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher @@ -59,13 +56,11 @@ public class ApplicationContextMessageBusTests { @Test public void endpointRegistrationWithInputChannelReference() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel sourceChannel = new QueueChannel(); QueueChannel targetChannel = new QueueChannel(); - sourceChannel.setBeanName("sourceChannel"); - targetChannel.setBeanName("targetChannel"); - context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel); - context.getBeanFactory().registerSingleton("targetChannel", targetChannel); + context.registerChannel("sourceChannel", sourceChannel); + context.registerChannel("targetChannel", targetChannel); Message message = MessageBuilder.withPayload("test") .setReplyChannelName("targetChannel").build(); sourceChannel.send(message); @@ -76,37 +71,25 @@ public class ApplicationContextMessageBusTests { }; handler.setBeanFactory(context); PollingConsumer endpoint = new PollingConsumer(sourceChannel, handler); - endpoint.afterPropertiesSet(); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus); - bus.setApplicationContext(context); - bus.start(); Message result = targetChannel.receive(3000); assertEquals("test", result.getPayload()); - bus.stop(); + context.stop(); } @Test public void channelsWithoutHandlers() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel sourceChannel = new QueueChannel(); - sourceChannel.setBeanName("sourceChannel"); - context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel); + context.registerChannel("sourceChannel", sourceChannel); sourceChannel.send(new StringMessage("test")); QueueChannel targetChannel = new QueueChannel(); - targetChannel.setBeanName("targetChannel"); - context.getBeanFactory().registerSingleton("targetChannel", targetChannel); + context.registerChannel("targetChannel", targetChannel); context.refresh(); - bus.start(); Message result = targetChannel.receive(100); assertNull(result); - bus.stop(); + context.stop(); } @Test @@ -116,15 +99,13 @@ public class ApplicationContextMessageBusTests { PollableChannel sourceChannel = (PollableChannel) context.getBean("sourceChannel"); sourceChannel.send(new GenericMessage("test")); PollableChannel targetChannel = (PollableChannel) context.getBean("targetChannel"); - Lifecycle bus = (Lifecycle) context.getBean("bus"); - bus.start(); Message result = targetChannel.receive(1000); assertEquals("test", result.getPayload()); } @Test public void exactlyOneConsumerReceivesPointToPointMessage() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); @@ -140,35 +121,26 @@ public class ApplicationContextMessageBusTests { replyHolder.set(message); } }; - inputChannel.setBeanName("input"); - outputChannel1.setBeanName("output1"); - outputChannel2.setBeanName("output2"); - context.getBeanFactory().registerSingleton("input", inputChannel); - context.getBeanFactory().registerSingleton("output1", outputChannel1); - context.getBeanFactory().registerSingleton("output2", outputChannel2); + context.registerChannel("input", inputChannel); + context.registerChannel("output1", outputChannel1); + context.registerChannel("output2", outputChannel2); handler1.setOutputChannel(outputChannel1); handler2.setOutputChannel(outputChannel2); PollingConsumer endpoint1 = new PollingConsumer(inputChannel, handler1); - endpoint1.afterPropertiesSet(); PollingConsumer endpoint2 = new PollingConsumer(inputChannel, handler2); - endpoint2.afterPropertiesSet(); - context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); - context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint1", endpoint1); + context.registerEndpoint("testEndpoint2", endpoint2); context.refresh(); - bus.start(); inputChannel.send(new StringMessage("testing")); Message message1 = outputChannel1.receive(500); Message message2 = outputChannel2.receive(0); - bus.stop(); + context.stop(); assertTrue("exactly one message should be null", message1 == null ^ message2 == null); } @Test public void bothConsumersReceivePublishSubscribeMessage() throws InterruptedException { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); PublishSubscribeChannel inputChannel = new PublishSubscribeChannel(); QueueChannel outputChannel1 = new QueueChannel(); QueueChannel outputChannel2 = new QueueChannel(); @@ -187,60 +159,45 @@ public class ApplicationContextMessageBusTests { latch.countDown(); } }; - inputChannel.setBeanName("input"); - outputChannel1.setBeanName("output1"); - outputChannel2.setBeanName("output2"); - context.getBeanFactory().registerSingleton("input", inputChannel); - context.getBeanFactory().registerSingleton("output1", outputChannel1); - context.getBeanFactory().registerSingleton("output2", outputChannel2); + context.registerChannel("input", inputChannel); + context.registerChannel("output1", outputChannel1); + context.registerChannel("output2", outputChannel2); handler1.setOutputChannel(outputChannel1); handler2.setOutputChannel(outputChannel2); EventDrivenConsumer endpoint1 = new EventDrivenConsumer(inputChannel, handler1); EventDrivenConsumer endpoint2 = new EventDrivenConsumer(inputChannel, handler2); - context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1); - context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint1", endpoint1); + context.registerEndpoint("testEndpoint2", endpoint2); context.refresh(); - bus.start(); inputChannel.send(new StringMessage("testing")); latch.await(500, TimeUnit.MILLISECONDS); assertEquals("both handlers should have been invoked", 0, latch.getCount()); Message message1 = outputChannel1.receive(500); Message message2 = outputChannel2.receive(500); - bus.stop(); + context.stop(); assertNotNull("both handlers should have replied to the message", message1); assertNotNull("both handlers should have replied to the message", message2); } @Test public void errorChannelWithFailedDispatch() throws InterruptedException { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel errorChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - errorChannel.setBeanName("errorChannel"); - context.getBeanFactory().registerSingleton("errorChannel", errorChannel); + context.registerChannel("errorChannel", errorChannel); CountDownLatch latch = new CountDownLatch(1); SourcePollingChannelAdapter channelAdapter = new SourcePollingChannelAdapter(); channelAdapter.setSource(new FailingSource(latch)); channelAdapter.setTrigger(new IntervalTrigger(1000)); channelAdapter.setOutputChannel(outputChannel); - channelAdapter.setBeanName("testChannel"); - context.getBeanFactory().registerSingleton("testChannel", channelAdapter); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - SimpleTaskScheduler taskScheduler = (SimpleTaskScheduler) TestUtils.createTaskScheduler(10); + context.registerEndpoint("testChannel", channelAdapter); ChannelResolver channelResolver = new BeanFactoryChannelResolver(context); MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler(channelResolver); errorHandler.setDefaultErrorChannel(errorChannel); - taskScheduler.setErrorHandler(errorHandler); - bus.setTaskScheduler(taskScheduler); - bus.setApplicationContext(context); context.refresh(); - bus.start(); latch.await(2000, TimeUnit.MILLISECONDS); Message message = errorChannel.receive(5000); - bus.stop(); + context.stop(); assertNull(outputChannel.receive(0)); assertNotNull("message should not be null", message); assertTrue(message instanceof ErrorMessage); @@ -248,17 +205,11 @@ public class ApplicationContextMessageBusTests { assertEquals("intentional test failure", exception.getMessage()); } - @Test(expected = BeanCreationException.class) - public void multipleMessageBusBeans() { - new ClassPathXmlApplicationContext("multipleMessageBusBeans.xml", this.getClass()); - } - @Test public void consumerSubscribedToErrorChannel() throws InterruptedException { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel errorChannel = new QueueChannel(); - errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME); - context.getBeanFactory().registerSingleton(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel); + context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel); final CountDownLatch latch = new CountDownLatch(1); AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() { @Override @@ -267,16 +218,12 @@ public class ApplicationContextMessageBusTests { } }; PollingConsumer endpoint = new PollingConsumer(errorChannel, handler); - endpoint.afterPropertiesSet(); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - bus.start(); errorChannel.send(new ErrorMessage(new RuntimeException("test-exception"))); latch.await(1000, TimeUnit.MILLISECONDS); assertEquals("handler should have received error message", 0, latch.getCount()); + context.stop(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java index b7428d06f9..16f1039a5c 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java @@ -21,14 +21,13 @@ import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.channel.ThreadLocalChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; -import org.springframework.integration.config.xml.MessageBusParser; +import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessagingException; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -37,15 +36,14 @@ import org.springframework.integration.handler.ReplyMessageHolder; import org.springframework.integration.handler.ServiceActivatingHandler; import org.springframework.integration.message.StringMessage; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher */ public class DirectChannelSubscriptionTests { - private GenericApplicationContext context = new GenericApplicationContext(); - - private ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); + private TestApplicationContext context = TestUtils.createTestApplicationContext(); private DirectChannel sourceChannel = new DirectChannel(); @@ -54,30 +52,23 @@ public class DirectChannelSubscriptionTests { @Before public void setupChannels() { - sourceChannel.setBeanName("sourceChannel"); - targetChannel.setBeanName("targetChannel"); - context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel); - context.getBeanFactory().registerSingleton("targetChannel", targetChannel); - context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus); - bus.setApplicationContext(context); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); + context.registerChannel("sourceChannel", sourceChannel); + context.registerChannel("targetChannel", targetChannel); } @Test public void sendAndReceiveForRegisteredEndpoint() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "handle"); serviceActivator.setOutputChannel(targetChannel); EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, serviceActivator); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - bus.start(); this.sourceChannel.send(new StringMessage("foo")); Message response = this.targetChannel.receive(); assertEquals("foo!", response.getPayload()); - bus.stop(); + context.stop(); } @Test @@ -88,11 +79,10 @@ public class DirectChannelSubscriptionTests { TestEndpoint endpoint = new TestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); context.refresh(); - bus.start(); this.sourceChannel.send(new StringMessage("foo")); Message response = this.targetChannel.receive(); assertEquals("foo-from-annotated-endpoint", response.getPayload()); - bus.stop(); + context.stop(); } @Test(expected = MessagingException.class) @@ -105,27 +95,32 @@ public class DirectChannelSubscriptionTests { }; handler.setOutputChannel(targetChannel); EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, handler); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - bus.start(); - this.sourceChannel.send(new StringMessage("foo")); + try { + this.sourceChannel.send(new StringMessage("foo")); + } + finally { + context.stop(); + } } @Test(expected = MessagingException.class) public void exceptionThrownFromAnnotatedEndpoint() { QueueChannel errorChannel = new QueueChannel(); - errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME); - context.getBeanFactory().registerSingleton( - ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel); + context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); FailingTestEndpoint endpoint = new FailingTestEndpoint(); postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint"); context.refresh(); - bus.start(); - this.sourceChannel.send(new StringMessage("foo")); + try { + this.sourceChannel.send(new StringMessage("foo")); + } + finally { + context.stop(); + } } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java deleted file mode 100644 index 037196c959..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.bus; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - -import org.junit.Test; - -import org.springframework.beans.factory.support.RootBeanDefinition; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.integration.config.xml.MessageBusParser; -import org.springframework.integration.util.TestUtils; - -/** - * @author Marius Bogoevici - * @author Mark Fisher - */ -public class MessageBusEventTests { - - @Test - public void messageBusStartedEvent() { - GenericApplicationContext context = new GenericApplicationContext(); - context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class)); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); - context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener"); - assertNull(listener.startedBus); - assertNull(listener.stoppedBus); - context.refresh(); // bus will start - assertNotNull(listener.startedBus); - assertNull(listener.stoppedBus); - assertEquals(messageBus, listener.startedBus); - } - - @Test - public void messageBusStoppedEvent() { - GenericApplicationContext context = new GenericApplicationContext(); - context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class)); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); - context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener"); - assertNull(listener.startedBus); - assertNull(listener.stoppedBus); - context.refresh(); - assertNotNull(listener.startedBus); - assertNull(listener.stoppedBus); - messageBus.stop(); - assertNotNull(listener.stoppedBus); - assertEquals(messageBus, listener.stoppedBus); - } - - - public static class TestMessageBusEventListener implements ApplicationListener { - - private volatile ApplicationContextMessageBus startedBus; - - private volatile ApplicationContextMessageBus stoppedBus; - - - public ApplicationContextMessageBus getStartedBus() { - return this.startedBus; - } - - public ApplicationContextMessageBus getStoppedBus() { - return this.stoppedBus; - } - - public void onApplicationEvent(ApplicationEvent event) { - if (event instanceof MessageBusStartedEvent) { - this.startedBus = (ApplicationContextMessageBus) event.getSource(); - } - if (event instanceof MessageBusStoppedEvent) { - this.stoppedBus = (ApplicationContextMessageBus) event.getSource(); - } - } - } - -} diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml index bbd695a2ba..37e106e047 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml @@ -4,13 +4,9 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - - - - - - + + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml deleted file mode 100644 index 2e6ab1b168..0000000000 --- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml +++ /dev/null @@ -1,11 +0,0 @@ - - - - - - - - diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java index cef324ca90..eb8dfe68f5 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java @@ -26,11 +26,10 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.endpoint.PollingConsumer; @@ -39,19 +38,22 @@ import org.springframework.integration.handler.ReplyMessageHolder; import org.springframework.integration.message.MessageBuilder; import org.springframework.integration.message.StringMessage; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher */ public class MessageChannelTemplateTests { + private TestApplicationContext context = TestUtils.createTestApplicationContext(); + private QueueChannel requestChannel; @Before public void setUp() { this.requestChannel = new QueueChannel(); - this.requestChannel.setBeanName("requestChannel"); + context.registerChannel("requestChannel", requestChannel); AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() { @Override public void handleRequestMessage(Message message, ReplyMessageHolder replyHolder) { @@ -59,17 +61,14 @@ public class MessageChannelTemplateTests { } }; PollingConsumer endpoint = new PollingConsumer(requestChannel, handler); - endpoint.afterPropertiesSet(); - GenericApplicationContext context = new GenericApplicationContext(); - context.getBeanFactory().registerSingleton("requestChannel", requestChannel); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - bus.start(); } + @After + public void tearDown() { + context.stop(); + } @Test public void send() { diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml index bc016f29ef..7b16df9763 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml @@ -7,7 +7,7 @@ http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-1.0.xsd"> - + @@ -17,7 +17,7 @@ - + diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java index 5af6aefa5d..e1a47776eb 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java @@ -21,18 +21,19 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import org.junit.After; +import org.junit.Before; import org.junit.Test; -import org.springframework.context.Lifecycle; +import org.springframework.context.support.AbstractApplicationContext; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolutionException; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PollableChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; -import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.message.StringMessage; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; @@ -43,13 +44,22 @@ import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; @ContextConfiguration public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests { + @Before + public void startContext() { + ((AbstractApplicationContext) this.applicationContext).start(); + } + + @After + public void stopContext() { + ((AbstractApplicationContext) this.applicationContext).stop(); + } + + @Test public void targetOnly() { String beanName = "outboundWithImplicitChannel"; Object channel = this.applicationContext.getBean(beanName); assertTrue(channel instanceof DirectChannel); - Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - bus.start(); BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext); assertNotNull(channelResolver.resolveChannelName(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); @@ -61,7 +71,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertTrue(((MessageChannel) channel).send(message)); assertNotNull(consumer.getLastMessage()); assertEquals(message, consumer.getLastMessage()); - bus.stop(); } @Test @@ -69,8 +78,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests String beanName = "methodInvokingConsumer"; Object channel = this.applicationContext.getBean(beanName); assertTrue(channel instanceof DirectChannel); - Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - bus.start(); BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext); assertNotNull(channelResolver.resolveChannelName(beanName)); Object adapter = this.applicationContext.getBean(beanName + ".adapter"); @@ -82,24 +89,20 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests assertTrue(((MessageChannel) channel).send(message)); assertNotNull(testBean.getMessage()); assertEquals("consumer test", testBean.getMessage()); - bus.stop(); } @Test public void methodInvokingSource() { String beanName = "methodInvokingSource"; PollableChannel channel = (PollableChannel) this.applicationContext.getBean("queueChannel"); - Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - bus.start(); + TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); + testBean.store("source test"); Object adapter = this.applicationContext.getBean(beanName); assertNotNull(adapter); assertTrue(adapter instanceof SourcePollingChannelAdapter); - TestBean testBean = (TestBean) this.applicationContext.getBean("testBean"); - testBean.store("source test"); Message message = channel.receive(1000); assertNotNull(message); assertEquals("source test", testBean.getMessage()); - bus.stop(); } @Test(expected = ChannelResolutionException.class) diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java index 7c106caacb..4d581fac71 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java @@ -17,25 +17,16 @@ package org.springframework.integration.config; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import org.junit.Test; import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.BeanCreationException; -import org.springframework.beans.factory.BeanDefinitionStoreException; import org.springframework.context.ApplicationContext; -import org.springframework.context.Lifecycle; import org.springframework.context.event.SimpleApplicationEventMulticaster; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.core.task.SyncTaskExecutor; -import org.springframework.integration.bus.MessageBusEventTests.TestMessageBusEventListener; import org.springframework.integration.channel.BeanFactoryChannelResolver; -import org.springframework.integration.config.xml.MessageBusParser; -import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** @@ -60,42 +51,6 @@ public class MessageBusParserTests { assertEquals(context.getBean("errorChannel"), resolver.resolveChannelName("errorChannel")); } - @Test - public void testMultipleMessageBusElements() { - boolean exceptionThrown = false; - try { - new ClassPathXmlApplicationContext("multipleMessageBusElements.xml", this.getClass()); - } - catch (BeanDefinitionStoreException e) { - exceptionThrown = true; - assertEquals(IllegalStateException.class, e.getCause().getClass()); - } - assertTrue(exceptionThrown); - } - - @Test - public void testMessageBusElementAndBean() { - boolean exceptionThrown = false; - try { - new ClassPathXmlApplicationContext("messageBusElementAndBean.xml", this.getClass()); - } - catch (BeanCreationException e) { - exceptionThrown = true; - assertEquals(IllegalStateException.class, e.getCause().getClass()); - assertEquals(e.getBeanName(), MessageBusParser.MESSAGE_BUS_BEAN_NAME); - } - assertTrue(exceptionThrown); - } - - @Test - public void testAutoStartup() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageBusWithAutoStartup.xml", this.getClass()); - Lifecycle bus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - assertTrue(bus.isRunning()); - bus.stop(); - } - @Test public void testMulticasterIsSyncByDefault() { ApplicationContext context = new ClassPathXmlApplicationContext( @@ -131,42 +86,4 @@ public class MessageBusParserTests { assertEquals(ThreadPoolTaskExecutor.class, taskExecutor.getClass()); } - @Test - public void testMessageBusEventListenerReceivesStartedEvent() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageBusWithListener.xml", this.getClass()); - Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener"); - assertNull(listener.getStartedBus()); - assertNull(listener.getStoppedBus()); - messageBus.start(); - assertNotNull(listener.getStartedBus()); - assertEquals(messageBus, listener.getStartedBus()); - assertNull(listener.getStoppedBus()); - } - - @Test - public void testMessageBusEventListenerReceivesStoppedEvent() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageBusWithListener.xml", this.getClass()); - Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener"); - assertNull(listener.getStoppedBus()); - messageBus.start(); - messageBus.stop(); - assertNotNull(listener.getStoppedBus()); - assertEquals(messageBus, listener.getStoppedBus()); - } - - @Test - public void testMessageBusWithTaskScheduler() { - ApplicationContext context = new ClassPathXmlApplicationContext( - "messageBusWithTaskScheduler.xml", this.getClass()); - Object messageBus = context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME); - StubTaskScheduler schedulerBean = (StubTaskScheduler) context.getBean("testScheduler"); - TaskScheduler busScheduler = (TaskScheduler) new DirectFieldAccessor(messageBus).getPropertyValue("taskScheduler"); - assertNotNull(busScheduler); - assertEquals(schedulerBean, busScheduler); - } - } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java index a96276406f..21b3be34fe 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java @@ -25,16 +25,14 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.beans.factory.support.RootBeanDefinition; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.message.StringMessage; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher @@ -44,19 +42,14 @@ public class ServiceActivatorAnnotationPostProcessorTests { @Test public void testAnnotatedMethod() throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); + RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class); + context.registerBeanDefinition("postProcessor", postProcessorDef); context.registerBeanDefinition("testChannel", new RootBeanDefinition(QueueChannel.class)); RootBeanDefinition beanDefinition = new RootBeanDefinition(SimpleServiceActivatorAnnotationTestBean.class); beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(latch); context.registerBeanDefinition("testBean", beanDefinition); - String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME; - RootBeanDefinition busBeanDefinition = new RootBeanDefinition(ApplicationContextMessageBus.class); - busBeanDefinition.getPropertyValues().addPropertyValue("taskScheduler", TestUtils.createTaskScheduler(10)); - context.registerBeanDefinition(busBeanName, busBeanDefinition); - RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class); - context.registerBeanDefinition("postProcessor", postProcessorDef); context.refresh(); - context.start(); SimpleServiceActivatorAnnotationTestBean testBean = (SimpleServiceActivatorAnnotationTestBean) context.getBean("testBean"); assertEquals(1, latch.getCount()); assertNull(testBean.getMessageText()); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java index f1e2f03a1a..d98b5e1ce8 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java @@ -26,9 +26,9 @@ import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.support.AbstractApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; @@ -55,7 +55,7 @@ public class AnnotatedEndpointActivationTests { private PollableChannel output; @Autowired - private ApplicationContextMessageBus messageBus; + private AbstractApplicationContext applicationContext; // This has to be static because the MessageBus registers the handler // more than once (every time a test instance is created), but only one of @@ -90,15 +90,15 @@ public class AnnotatedEndpointActivationTests { } @Test(expected = MessageDeliveryException.class) - public void stopMessageBus() { - messageBus.stop(); + public void stopContext() { + applicationContext.stop(); this.input.send(new GenericMessage("foo")); } @Test - public void stopAndRestartMessageBus() { - messageBus.stop(); - messageBus.start(); + public void stopAndRestartContext() { + applicationContext.stop(); + applicationContext.start(); this.input.send(new GenericMessage("foo")); Message message = this.output.receive(100); assertNotNull(message); diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java index 2205b3c724..79291500e0 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java @@ -28,22 +28,18 @@ import org.junit.Test; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.DirectFieldAccessor; -import org.springframework.beans.factory.NoSuchBeanDefinitionException; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.ChannelAdapter; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Poller; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.annotation.Transformer; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.ChannelResolver; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.endpoint.PollingConsumer; @@ -53,6 +49,7 @@ import org.springframework.integration.message.StringMessage; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher @@ -61,14 +58,9 @@ public class MessagingAnnotationPostProcessorTests { @Test public void serviceActivatorAnnotation() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -131,12 +123,7 @@ public class MessagingAnnotationPostProcessorTests { @Test public void targetAnnotation() throws InterruptedException { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + TestApplicationContext context = TestUtils.createTestApplicationContext(); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -144,14 +131,13 @@ public class MessagingAnnotationPostProcessorTests { OutboundChannelAdapterTestBean testBean = new OutboundChannelAdapterTestBean(latch); postProcessor.postProcessAfterInitialization(testBean, "testBean"); context.refresh(); - messageBus.start(); ChannelResolver channelResolver = new BeanFactoryChannelResolver(context); MessageChannel testChannel = channelResolver.resolveChannelName("testChannel"); testChannel.send(new StringMessage("foo")); latch.await(1000, TimeUnit.MILLISECONDS); assertEquals(0, latch.getCount()); assertEquals("foo", testBean.getMessageText()); - messageBus.stop(); + context.stop(); } @Test(expected = IllegalArgumentException.class) @@ -161,29 +147,13 @@ public class MessagingAnnotationPostProcessorTests { postProcessor.afterPropertiesSet(); } - @Test(expected = NoSuchBeanDefinitionException.class) - public void testPostProcessorWithoutMessageBus() { - GenericApplicationContext context = new GenericApplicationContext(); - MessagingAnnotationPostProcessor postProcessor = - new MessagingAnnotationPostProcessor(); - postProcessor.setBeanFactory(context.getBeanFactory()); - postProcessor.afterPropertiesSet(); - } - @Test public void testChannelResolution() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); DirectChannel inputChannel = new DirectChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -195,22 +165,16 @@ public class MessagingAnnotationPostProcessorTests { inputChannel.send(message); Message reply = outputChannel.receive(0); assertNotNull(reply); + context.stop(); } @Test public void testProxiedMessageEndpointAnnotation() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -218,53 +182,37 @@ public class MessagingAnnotationPostProcessorTests { Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testMessageEndpointAnnotationInherited() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testMessageEndpointAnnotationInheritedWithProxy() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -272,79 +220,55 @@ public class MessagingAnnotationPostProcessorTests { Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("world")); Message message = outputChannel.receive(1000); assertEquals("hello world", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testMessageEndpointAnnotationInheritedFromInterface() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); assertEquals("test-ABC", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); assertEquals("test-ABC", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel inputChannel = new QueueChannel(); QueueChannel outputChannel = new QueueChannel(); - inputChannel.setBeanName("inputChannel"); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("inputChannel", inputChannel); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -352,23 +276,17 @@ public class MessagingAnnotationPostProcessorTests { Object proxy = proxyFactory.getProxy(); postProcessor.postProcessAfterInitialization(proxy, "proxy"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("ABC")); Message message = outputChannel.receive(1000); assertEquals("test-ABC", message.getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testEndpointWithPollerAnnotation() { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + TestApplicationContext context = TestUtils.createTestApplicationContext(); QueueChannel testChannel = new QueueChannel(); - testChannel.setBeanName("testChannel"); - context.getBeanFactory().registerSingleton("testChannel", testChannel); - messageBus.setApplicationContext(context); + context.registerChannel("testChannel", testChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -385,19 +303,12 @@ public class MessagingAnnotationPostProcessorTests { @Test public void testChannelAdapterAnnotation() throws InterruptedException { - GenericApplicationContext context = new GenericApplicationContext(); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + TestApplicationContext context = TestUtils.createTestApplicationContext(); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean(); postProcessor.postProcessAfterInitialization(testBean, "testBean"); - context.refresh(); - messageBus.start(); ChannelResolver channelResolver = new BeanFactoryChannelResolver(context); DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel"); final CountDownLatch latch = new CountDownLatch(1); @@ -408,26 +319,21 @@ public class MessagingAnnotationPostProcessorTests { latch.countDown(); } }); + context.refresh(); latch.await(3, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); assertNotNull(receivedMessage.get()); assertEquals("test", receivedMessage.get().getPayload()); - messageBus.stop(); + context.stop(); } @Test public void testTransformer() { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); DirectChannel inputChannel = new DirectChannel(); - inputChannel.setBeanName("inputChannel"); - context.getBeanFactory().registerSingleton("inputChannel", inputChannel); + context.registerChannel("inputChannel", inputChannel); QueueChannel outputChannel = new QueueChannel(); - outputChannel.setBeanName("outputChannel"); - context.getBeanFactory().registerSingleton("outputChannel", outputChannel); - ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); - context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("outputChannel", outputChannel); MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor(); postProcessor.setBeanFactory(context.getBeanFactory()); postProcessor.afterPropertiesSet(); @@ -437,6 +343,7 @@ public class MessagingAnnotationPostProcessorTests { inputChannel.send(new StringMessage("foo")); Message reply = outputChannel.receive(0); assertEquals("FOO", reply.getPayload()); + context.stop(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java index 1e8624c7f7..18dcae80da 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java @@ -21,25 +21,21 @@ import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Router; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.Message; import org.springframework.integration.message.StringMessage; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher */ public class RouterAnnotationPostProcessorTests { - private GenericApplicationContext context = new GenericApplicationContext(); - - private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); + private TestApplicationContext context = TestUtils.createTestApplicationContext(); private DirectChannel inputChannel = new DirectChannel(); @@ -48,14 +44,8 @@ public class RouterAnnotationPostProcessorTests { @Before public void init() { - messageBus.setApplicationContext(context); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - inputChannel.setBeanName("input"); - outputChannel.setBeanName("output"); - context.getBeanFactory().registerSingleton("input", inputChannel); - context.getBeanFactory().registerSingleton("output", outputChannel); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); + context.registerChannel("input", inputChannel); + context.registerChannel("output", outputChannel); } @@ -67,10 +57,10 @@ public class RouterAnnotationPostProcessorTests { TestRouter testRouter = new TestRouter(); postProcessor.postProcessAfterInitialization(testRouter, "test"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("foo")); Message replyMessage = outputChannel.receive(0); assertEquals("foo", replyMessage.getPayload()); + context.stop(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java index ba56ee9fc0..2d2736653b 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java @@ -23,25 +23,21 @@ import static org.junit.Assert.assertNull; import org.junit.Before; import org.junit.Test; -import org.springframework.context.support.GenericApplicationContext; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.Splitter; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.channel.QueueChannel; -import org.springframework.integration.config.xml.MessageBusParser; import org.springframework.integration.core.Message; import org.springframework.integration.message.StringMessage; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher */ public class SplitterAnnotationPostProcessorTests { - private GenericApplicationContext context = new GenericApplicationContext(); - - private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus(); + private TestApplicationContext context = TestUtils.createTestApplicationContext(); private DirectChannel inputChannel = new DirectChannel(); @@ -50,14 +46,8 @@ public class SplitterAnnotationPostProcessorTests { @Before public void init() { - inputChannel.setBeanName("input"); - outputChannel.setBeanName("output"); - context.getBeanFactory().registerSingleton("input", inputChannel); - context.getBeanFactory().registerSingleton("output", outputChannel); - context.getBeanFactory().registerSingleton( - MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus); - messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - messageBus.setApplicationContext(context); + context.registerChannel("input", inputChannel); + context.registerChannel("output", outputChannel); } @@ -69,7 +59,6 @@ public class SplitterAnnotationPostProcessorTests { TestSplitter splitter = new TestSplitter(); postProcessor.postProcessAfterInitialization(splitter, "testSplitter"); context.refresh(); - messageBus.start(); inputChannel.send(new StringMessage("this.is.a.test")); Message message1 = outputChannel.receive(500); assertNotNull(message1); @@ -84,7 +73,7 @@ public class SplitterAnnotationPostProcessorTests { assertNotNull(message4); assertEquals("test", message4.getPayload()); assertNull(outputChannel.receive(0)); - messageBus.stop(); + context.stop(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java index b159365e12..f5e5aa84ea 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java @@ -36,6 +36,7 @@ import org.springframework.integration.core.Message; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.core.MessageHeaders; import org.springframework.integration.message.MessageDeliveryException; +import org.springframework.integration.util.TestUtils; /** * @author Iwein Fuld @@ -59,6 +60,7 @@ public class SimpleMessagingGatewayTests { this.simpleMessagingGateway = new SimpleMessagingGateway(); this.simpleMessagingGateway.setRequestChannel(requestChannel); this.simpleMessagingGateway.setReplyChannel(replyChannel); + this.simpleMessagingGateway.setBeanFactory(TestUtils.createTestApplicationContext()); reset(allmocks); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java index 4180b9df4b..543840bfbf 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java @@ -26,14 +26,13 @@ import java.util.concurrent.TimeUnit; import org.junit.Test; -import org.springframework.context.support.GenericApplicationContext; -import org.springframework.integration.bus.ApplicationContextMessageBus; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.core.Message; import org.springframework.integration.core.MessagingException; import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.handler.MethodInvokingMessageHandler; import org.springframework.integration.util.TestUtils; +import org.springframework.integration.util.TestUtils.TestApplicationContext; /** * @author Mark Fisher @@ -75,27 +74,22 @@ public class MethodInvokingMessageHandlerTests { @Test public void subscription() throws Exception { - GenericApplicationContext context = new GenericApplicationContext(); + TestApplicationContext context = TestUtils.createTestApplicationContext(); SynchronousQueue queue = new SynchronousQueue(); TestBean testBean = new TestBean(queue); QueueChannel channel = new QueueChannel(); - channel.setBeanName("channel"); - context.getBeanFactory().registerSingleton("channel", channel); + context.registerChannel("channel", channel); Message message = new GenericMessage("testing"); channel.send(message); assertNull(queue.poll()); MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(testBean, "foo"); PollingConsumer endpoint = new PollingConsumer(channel, handler); - context.getBeanFactory().registerSingleton("testEndpoint", endpoint); - ApplicationContextMessageBus bus = new ApplicationContextMessageBus(); - bus.setTaskScheduler(TestUtils.createTaskScheduler(10)); - bus.setApplicationContext(context); + context.registerEndpoint("testEndpoint", endpoint); context.refresh(); - bus.start(); String result = queue.poll(1000, TimeUnit.MILLISECONDS); assertNotNull(result); assertEquals("testing", result); - bus.stop(); + context.stop(); } diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java b/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java index bfb5d0445f..bc231d2581 100644 --- a/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java +++ b/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java @@ -19,6 +19,16 @@ package org.springframework.integration.util; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import org.springframework.beans.DirectFieldAccessor; +import org.springframework.beans.FatalBeanException; +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.config.ConfigurableListableBeanFactory; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.integration.context.IntegrationContextUtils; +import org.springframework.integration.core.MessageChannel; +import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.scheduling.SimpleTaskScheduler; import org.springframework.integration.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -56,6 +66,12 @@ public abstract class TestUtils { return (T) value; } + public static TestApplicationContext createTestApplicationContext() { + TestApplicationContext context = new TestApplicationContext(); + registerBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, createTaskScheduler(10), context); + return context; + } + public static TaskScheduler createTaskScheduler(int poolSize) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(poolSize); @@ -64,4 +80,56 @@ public abstract class TestUtils { return new SimpleTaskScheduler(executor); } + private static void registerBean(String beanName, Object bean, BeanFactory beanFactory) { + Assert.notNull(beanName, "bean name must not be null"); + ConfigurableListableBeanFactory configurableListableBeanFactory = null; + if (beanFactory instanceof ConfigurableListableBeanFactory) { + configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory; + } + else if (beanFactory instanceof GenericApplicationContext) { + configurableListableBeanFactory = ((GenericApplicationContext) beanFactory).getBeanFactory(); + } + if (bean instanceof BeanNameAware) { + ((BeanNameAware) bean).setBeanName(beanName); + } + if (bean instanceof BeanFactoryAware) { + ((BeanFactoryAware) bean).setBeanFactory(beanFactory); + } + if (bean instanceof InitializingBean) { + try { + ((InitializingBean) bean).afterPropertiesSet(); + } + catch (Exception e) { + throw new FatalBeanException("failed to register bean with test context", e); + } + } + configurableListableBeanFactory.registerSingleton(beanName, bean); + } + + + public static class TestApplicationContext extends GenericApplicationContext { + + private TestApplicationContext() { + super(); + } + + public void registerChannel(String channelName, MessageChannel channel) { + if (channel.getName() != null) { + if (channelName == null) { + Assert.notNull(channel.getName(), "channel name must not be null"); + channelName = channel.getName(); + } + else { + Assert.isTrue(channel.getName().equals(channelName), + "channel name has already been set with a conflicting value"); + } + } + registerBean(channelName, channel, this); + } + + public void registerEndpoint(String endpointName, AbstractEndpoint endpoint) { + registerBean(endpointName, endpoint, this); + } + } + }