diff --git a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java
index 50557ae6f8..4c1cc4cb9e 100644
--- a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java
+++ b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java
@@ -50,7 +50,8 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor
}
}
- public void onApplicationEvent(ApplicationEvent event) {
+ @Override
+ protected void onEvent(ApplicationEvent event) {
if (CollectionUtils.isEmpty(this.eventTypes)) {
this.sendEventAsMessage(event);
return;
@@ -67,4 +68,12 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor
return this.sendMessage(MessageBuilder.withPayload(event).build());
}
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
}
diff --git a/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java b/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java
index 8a78f2b8ae..89b8a6e331 100644
--- a/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java
+++ b/org.springframework.integration.event/src/test/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapterTests.java
@@ -30,8 +30,6 @@ import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.integration.bus.MessageBusStartedEvent;
-import org.springframework.integration.bus.MessageBusStoppedEvent;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.Message;
@@ -76,26 +74,20 @@ public class ApplicationEventInboundChannelAdapterTests {
}
@Test
- public void messageBusAndApplicationContextEvents() {
+ public void applicationContextEvents() {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
"applicationEventInboundChannelAdapterTests.xml", this.getClass());
PollableChannel channel = (PollableChannel) context.getBean("channel");
- Message> busStartedEventMessage = channel.receive(0);
Message> contextRefreshedEventMessage = channel.receive(0);
- assertNotNull(busStartedEventMessage);
assertNotNull(contextRefreshedEventMessage);
- assertEquals(MessageBusStartedEvent.class, busStartedEventMessage.getPayload().getClass());
assertEquals(ContextRefreshedEvent.class, contextRefreshedEventMessage.getPayload().getClass());
context.start();
Message> startedEventMessage = channel.receive(0);
assertNotNull(startedEventMessage);
assertEquals(ContextStartedEvent.class, startedEventMessage.getPayload().getClass());
context.stop();
- Message> busStoppedEventMessage = channel.receive(0);
Message> contextStoppedEventMessage = channel.receive(0);
- assertNotNull(busStoppedEventMessage);
assertNotNull(contextStoppedEventMessage);
- assertEquals(MessageBusStoppedEvent.class, busStoppedEventMessage.getPayload().getClass());
assertEquals(ContextStoppedEvent.class, contextStoppedEventMessage.getPayload().getClass());
context.close();
Message> closedEventMessage = channel.receive(0);
diff --git a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java
index 6b9328a48b..432cdb2500 100644
--- a/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java
+++ b/org.springframework.integration.httpinvoker/src/main/java/org/springframework/integration/httpinvoker/HttpInvokerInboundGateway.java
@@ -22,7 +22,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.adapter.RemoteMessageHandler;
import org.springframework.integration.adapter.RemotingInboundGatewaySupport;
import org.springframework.integration.core.MessagingException;
@@ -59,12 +58,13 @@ import org.springframework.web.HttpRequestHandler;
*
* @author Mark Fisher
*/
-public class HttpInvokerInboundGateway extends RemotingInboundGatewaySupport implements HttpRequestHandler, InitializingBean {
+public class HttpInvokerInboundGateway extends RemotingInboundGatewaySupport implements HttpRequestHandler {
private volatile HttpInvokerServiceExporter exporter;
- public void afterPropertiesSet() {
+ @Override
+ protected void onInit() {
HttpInvokerServiceExporter exporter = new HttpInvokerServiceExporter();
exporter.setService(this);
exporter.setServiceInterface(RemoteMessageHandler.class);
diff --git a/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java b/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java
index e22e7d8a27..9b21fa5917 100644
--- a/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java
+++ b/org.springframework.integration.jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java
@@ -23,7 +23,6 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.springframework.beans.factory.DisposableBean;
-import org.springframework.context.Lifecycle;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.core.Message;
import org.springframework.integration.gateway.SimpleMessagingGateway;
@@ -41,7 +40,7 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public class JmsInboundGateway extends SimpleMessagingGateway implements Lifecycle, DisposableBean {
+public class JmsInboundGateway extends SimpleMessagingGateway implements DisposableBean {
private volatile AbstractMessageListenerContainer container;
@@ -190,19 +189,19 @@ public class JmsInboundGateway extends SimpleMessagingGateway implements Lifecyc
// Lifecycle implementation
- public boolean isRunning() {
- return (this.container != null && this.container.isRunning());
- }
-
- public void start() {
+ @Override
+ protected void doStart() {
this.initialize();
this.container.start();
+ super.doStart();
}
- public void stop() {
+ @Override
+ protected void doStop() {
if (this.container != null) {
this.container.stop();
}
+ super.doStop();
}
// DisposableBean implementation
diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java
index 9878db9da0..c113931550 100644
--- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java
+++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/JmsInboundChannelAdapterParserTests.java
@@ -81,8 +81,9 @@ public class JmsInboundChannelAdapterParserTests {
try {
new ClassPathXmlApplicationContext("jmsInboundWithDestinationOnly.xml", this.getClass());
}
- catch (RuntimeException e) {
- assertEquals(NoSuchBeanDefinitionException.class, e.getCause().getClass());
+ catch (BeanCreationException e) {
+ Throwable rootCause = e.getRootCause();
+ assertEquals(NoSuchBeanDefinitionException.class, rootCause.getClass());
throw e;
}
}
diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml
index a5895c10ce..7c04cfe1f5 100644
--- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml
+++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewayWithContainerSettings.xml
@@ -10,8 +10,6 @@
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd">
-
-
diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml
index 34aec969a7..4b81321099 100644
--- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml
+++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsGatewaysWithExtractPayloadAttributes.xml
@@ -10,8 +10,6 @@
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd">
-
-
diff --git a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml
index 40c13d5505..0c04fc0422 100644
--- a/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml
+++ b/org.springframework.integration.jms/src/test/java/org/springframework/integration/jms/config/jmsOutboundGatewayWithConverter.xml
@@ -10,8 +10,6 @@
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-1.0.xsd">
-
-
diff --git a/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java b/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java
index 2859336e9c..2ac616014f 100755
--- a/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java
+++ b/org.springframework.integration.mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java
@@ -20,7 +20,6 @@ import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
-import org.springframework.context.Lifecycle;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.endpoint.MessageProducerSupport;
@@ -37,16 +36,12 @@ import org.springframework.util.Assert;
* @author Arjen Poutsma
* @author Mark Fisher
*/
-public class ImapIdleChannelAdapter extends MessageProducerSupport implements Lifecycle {
+public class ImapIdleChannelAdapter extends MessageProducerSupport {
private final IdleTask idleTask = new IdleTask();
private volatile TaskExecutor taskExecutor;
- private volatile boolean running;
-
- private final Object lifecycleMonitor = new Object();
-
private final ImapMailReceiver mailReceiver;
@@ -70,33 +65,21 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Li
* Lifecycle implementation
*/
- public boolean isRunning() {
- return this.running;
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ if (this.taskExecutor == null) {
+ if (logger.isInfoEnabled()) {
+ logger.info("No TaskExecutor has been provided, will use a ["
+ + SimpleAsyncTaskExecutor.class + "] as the default.");
+ }
+ this.taskExecutor = new SimpleAsyncTaskExecutor();
+ }
+ this.taskExecutor.execute(this.idleTask);
}
- public void start() {
- synchronized (this.lifecycleMonitor) {
- if (!this.running) {
- if (this.taskExecutor == null) {
- if (logger.isInfoEnabled()) {
- logger.info("No TaskExecutor has been provided, will use a ["
- + SimpleAsyncTaskExecutor.class + "] as the default.");
- }
- this.taskExecutor = new SimpleAsyncTaskExecutor();
- }
- this.taskExecutor.execute(this.idleTask);
- }
- this.running = true;
- }
- }
-
- public void stop() {
- synchronized (this.lifecycleMonitor) {
- if (this.running) {
- this.idleTask.interrupt();
- }
- this.running = false;
- }
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ this.idleTask.interrupt();
}
diff --git a/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml b/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml
index 0b96d28d40..31b23ef20f 100644
--- a/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml
+++ b/org.springframework.integration.mail/src/test/java/org/springframework/integration/mail/config/pollingMailSourceParserTests.xml
@@ -10,6 +10,6 @@
http://www.springframework.org/schema/integration/mail
http://www.springframework.org/schema/integration/mail/spring-integration-mail-1.0.xsd">
-
+
diff --git a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java
index 16f1c3de1f..e52a837c5e 100644
--- a/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java
+++ b/org.springframework.integration.rmi/src/main/java/org/springframework/integration/rmi/RmiInboundGateway.java
@@ -72,7 +72,8 @@ public class RmiInboundGateway extends RemotingInboundGatewaySupport implements
this.remoteInvocationExecutor = remoteInvocationExecutor;
}
- public void afterPropertiesSet() throws RemoteException {
+ @Override
+ protected void onInit() throws RemoteException {
RmiServiceExporter exporter = new RmiServiceExporter();
if (this.registryHost != null) {
exporter.setRegistryHost(this.registryHost);
diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java
index af009ef68b..1611bda56e 100644
--- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java
+++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleInboundChannelAdapterParserTests.java
@@ -97,9 +97,7 @@ public class ConsoleInboundChannelAdapterParserTests {
catch (BeanCreationException e) {
beanCreationException = e;
}
- Throwable parentCause = beanCreationException.getCause().getCause();
- assertEquals(IllegalArgumentException.class, parentCause.getClass());
- Throwable rootCause = ((IllegalArgumentException) parentCause).getCause();
+ Throwable rootCause = beanCreationException.getRootCause();
assertEquals(UnsupportedEncodingException.class, rootCause.getClass());
}
diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java
index 08645c3014..4ed6050bf5 100644
--- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java
+++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/ConsoleOutboundChannelAdapterParserTests.java
@@ -108,9 +108,7 @@ public class ConsoleOutboundChannelAdapterParserTests {
catch (BeanCreationException e) {
beanCreationException = e;
}
- Throwable parentCause = beanCreationException.getCause().getCause();
- assertEquals(IllegalArgumentException.class, parentCause.getClass());
- Throwable rootCause = ((IllegalArgumentException) parentCause).getCause();
+ Throwable rootCause = beanCreationException.getRootCause();
assertEquals(UnsupportedEncodingException.class, rootCause.getClass());
}
diff --git a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml
index 46c54a7bb5..238f9cdd66 100644
--- a/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml
+++ b/org.springframework.integration.stream/src/test/java/org/springframework/integration/stream/config/consoleInboundChannelAdapterParserTests.xml
@@ -10,8 +10,8 @@
http://www.springframework.org/schema/integration/stream
http://www.springframework.org/schema/integration/stream/spring-integration-stream-1.0.xsd">
-
+
-
+
\ No newline at end of file
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java
index a8690fc447..f9640d5b24 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/bus/ApplicationContextMessageBus.java
@@ -16,175 +16,7 @@
package org.springframework.integration.bus;
-import java.util.Collection;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.springframework.beans.BeansException;
-import org.springframework.beans.factory.DisposableBean;
-import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor;
-import org.springframework.context.ApplicationContext;
-import org.springframework.context.ApplicationContextAware;
-import org.springframework.context.ApplicationEvent;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.Lifecycle;
-import org.springframework.context.event.ContextRefreshedEvent;
-import org.springframework.integration.endpoint.MessageEndpoint;
-import org.springframework.integration.scheduling.TaskScheduler;
-import org.springframework.integration.scheduling.TaskSchedulerAware;
-import org.springframework.util.Assert;
-
-/**
- * Spring Integration's standard Message Bus implementation. Serves as a
- * registry for Messages Endpoints. Manages their lifecycle, activates
- * subscriptions, and schedules pollers by delegating to a
- * {@link TaskScheduler}. Retrieves MessageChannels from the
- * ApplicationContext based on bean name.
- *
- * @author Mark Fisher
- * @author Marius Bogoevici
- */
-public class ApplicationContextMessageBus implements ApplicationContextAware, ApplicationListener, Lifecycle, DisposableBean {
-
- public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel";
-
-
- private final Log logger = LogFactory.getLog(this.getClass());
-
- private volatile TaskScheduler taskScheduler;
-
- private volatile ApplicationContext applicationContext;
-
- private volatile boolean autoStartup = true;
-
- private volatile boolean running;
-
- private final Object lifecycleMonitor = new Object();
-
-
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- Assert.notNull(applicationContext, "'applicationContext' must not be null");
- Assert.state(!(applicationContext.getBeanNamesForType(this.getClass()).length > 1),
- "Only one instance of '" + this.getClass().getSimpleName() + "' is allowed per ApplicationContext.");
- this.applicationContext = applicationContext;
- }
-
- /**
- * Set the {@link TaskScheduler} to use for scheduling message dispatchers.
- */
- public void setTaskScheduler(TaskScheduler taskScheduler) {
- this.taskScheduler = taskScheduler;
- }
-
- /**
- * Set whether to automatically start the bus after initialization.
- *
Default is 'true'; set this to 'false' to allow for manual startup
- * through the {@link #start()} method.
- */
- public void setAutoStartup(boolean autoStartup) {
- this.autoStartup = autoStartup;
- }
-
- private Collection getEndpoints() {
- GenericBeanFactoryAccessor accessor = new GenericBeanFactoryAccessor(this.applicationContext);
- return accessor.getBeansOfType(MessageEndpoint.class).values();
- }
-
- private void activateEndpoints() {
- for (MessageEndpoint endpoint : this.getEndpoints()) {
- if (endpoint != null) {
- this.activateEndpoint(endpoint);
- }
- }
- }
-
- private void deactivateEndpoints() {
- for (MessageEndpoint endpoint : this.getEndpoints()) {
- if (endpoint != null) {
- this.deactivateEndpoint(endpoint);
- }
- }
- }
-
- private void activateEndpoint(MessageEndpoint endpoint) {
- Assert.notNull(endpoint, "'endpoint' must not be null");
- if (endpoint instanceof TaskSchedulerAware) {
- ((TaskSchedulerAware) endpoint).setTaskScheduler(this.taskScheduler);
- }
- if (endpoint instanceof Lifecycle) {
- ((Lifecycle) endpoint).start();
- }
- if (logger.isInfoEnabled()) {
- logger.info("activated endpoint '" + endpoint + "'");
- }
- }
-
- private void deactivateEndpoint(MessageEndpoint endpoint) {
- Assert.notNull(endpoint, "'endpoint' must not be null");
- if (endpoint instanceof Lifecycle) {
- ((Lifecycle) endpoint).stop();
- if (this.logger.isInfoEnabled()) {
- logger.info("deactivated endpoint '" + endpoint + "'");
- }
- }
- }
-
- // Lifecycle implementation
-
- public boolean isRunning() {
- synchronized (this.lifecycleMonitor) {
- return this.running;
- }
- }
-
- public void start() {
- if (this.running) {
- return;
- }
- Assert.notNull(this.applicationContext, "ApplicationContext must not be null");
- Assert.notNull(this.taskScheduler, "TaskScheduler must not be null");
- synchronized (this.lifecycleMonitor) {
- this.activateEndpoints();
- this.taskScheduler.start();
- }
- this.running = true;
- this.applicationContext.publishEvent(new MessageBusStartedEvent(this));
- if (logger.isInfoEnabled()) {
- logger.info("message bus started");
- }
- }
-
- public void stop() {
- if (!this.isRunning()) {
- return;
- }
- synchronized (this.lifecycleMonitor) {
- this.deactivateEndpoints();
- this.running = false;
- this.taskScheduler.stop();
- }
- this.applicationContext.publishEvent(new MessageBusStoppedEvent(this));
- if (logger.isInfoEnabled()) {
- logger.info("message bus stopped");
- }
- }
-
- // ApplicationListener implementation
-
- public void onApplicationEvent(ApplicationEvent event) {
- if (event instanceof ContextRefreshedEvent && this.autoStartup) {
- this.start();
- }
- }
-
- // DisposableBean implementation
-
- public void destroy() throws Exception {
- this.stop();
- if (this.taskScheduler instanceof DisposableBean) {
- ((DisposableBean) this.taskScheduler).destroy();
- }
- }
+// TODO: placeholder only, delete after handling all dependencies
+public class ApplicationContextMessageBus {
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java
index 4b0ef49903..526a5ebaee 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/channel/MessagePublishingErrorHandler.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
+import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessagingException;
@@ -97,6 +98,10 @@ public class MessagePublishingErrorHandler implements ErrorHandler, BeanFactoryA
}
private MessageChannel resolveErrorChannel(Message> failedMessage) {
+ if (this.defaultErrorChannel == null && this.channelResolver != null) {
+ this.defaultErrorChannel = this.channelResolver.resolveChannelName(
+ IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
+ }
if (failedMessage == null || failedMessage.getHeaders().getErrorChannel() == null) {
return this.defaultErrorChannel;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
index 3857b0ce96..77cbc072ad 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java
@@ -22,11 +22,13 @@ import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.SubscribableChannel;
import org.springframework.integration.core.MessageChannel;
-import org.springframework.integration.endpoint.MessageEndpoint;
+import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.message.MessageHandler;
@@ -39,7 +41,7 @@ import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
-public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, InitializingBean {
+public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAware, BeanNameAware, InitializingBean, ApplicationListener {
private final MessageHandler handler;
@@ -61,7 +63,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
private volatile ConfigurableBeanFactory beanFactory;
- private volatile MessageEndpoint endpoint;
+ private volatile AbstractEndpoint endpoint;
private volatile boolean initialized;
@@ -74,10 +76,6 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
}
- public void setBeanName(String beanName) {
- this.beanName = beanName;
- }
-
public void setInputChannelName(String inputChannelName) {
this.inputChannelName = inputChannelName;
}
@@ -106,14 +104,22 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
this.transactionDefinition = transactionDefinition;
}
+ public void setBeanName(String beanName) {
+ this.beanName = beanName;
+ }
+
public void setBeanFactory(BeanFactory beanFactory) {
Assert.isInstanceOf(ConfigurableBeanFactory.class, beanFactory,
"a ConfigurableBeanFactory is required");
this.beanFactory = (ConfigurableBeanFactory) beanFactory;
}
- public void afterPropertiesSet() {
- Assert.hasText(this.inputChannelName, "inputChannelName is required");
+ public void afterPropertiesSet() throws Exception {
+ this.initializeEndpoint();
+ }
+
+ public boolean isSingleton() {
+ return true;
}
public Object getObject() throws Exception {
@@ -125,20 +131,17 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
public Class> getObjectType() {
if (this.endpoint == null) {
- return MessageEndpoint.class;
+ return AbstractEndpoint.class;
}
- return endpoint.getClass();
+ return this.endpoint.getClass();
}
- public boolean isSingleton() {
- return true;
- }
-
- private void initializeEndpoint() {
+ private void initializeEndpoint() throws Exception {
synchronized (this.initializationMonitor) {
if (this.initialized) {
return;
}
+ Assert.hasText(this.inputChannelName, "inputChannelName is required");
Assert.isTrue(this.beanFactory.containsBean(this.inputChannelName),
"no such input channel '" + this.inputChannelName + "' for endpoint '" + this.beanName + "'");
MessageChannel channel = (MessageChannel)
@@ -166,8 +169,17 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
throw new IllegalArgumentException(
"unsupported channel type: [" + channel.getClass() + "]");
}
+ this.endpoint.setBeanName(this.beanName);
+ this.endpoint.setBeanFactory(this.beanFactory);
+ if (this.endpoint instanceof InitializingBean) {
+ ((InitializingBean) this.endpoint).afterPropertiesSet();
+ }
this.initialized = true;
}
}
+ public void onApplicationEvent(ApplicationEvent event) {
+ this.endpoint.onApplicationEvent(event);
+ }
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
index 7c7059d163..8d2d9bac51 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java
@@ -20,7 +20,6 @@ import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import org.springframework.beans.factory.BeanFactoryAware;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.generic.GenericBeanFactoryAccessor;
import org.springframework.core.annotation.AnnotationUtils;
@@ -64,15 +63,6 @@ public abstract class AbstractMethodAnnotationPostProcessor, MethodAnnotationPostProcessor>> postProcessors =
new HashMap, MethodAnnotationPostProcessor>>();
+ private Set listeners = new HashSet();
+
public void setBeanFactory(BeanFactory beanFactory) {
Assert.isAssignable(ConfigurableListableBeanFactory.class, beanFactory.getClass(),
@@ -74,7 +77,6 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean
public void afterPropertiesSet() {
Assert.notNull(this.beanFactory, "BeanFactory must not be null");
- this.messageBus = (Lifecycle) this.beanFactory.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
postProcessors.put(Aggregator.class, new AggregatorAnnotationPostProcessor(this.beanFactory));
postProcessors.put(ChannelAdapter.class, new ChannelAdapterAnnotationPostProcessor(this.beanFactory));
postProcessors.put(Router.class, new RouterAnnotationPostProcessor(this.beanFactory));
@@ -108,8 +110,19 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean
((BeanNameAware) result).setBeanName(endpointBeanName);
}
beanFactory.registerSingleton(endpointBeanName, result);
- if (messageBus.isRunning() && result instanceof Lifecycle) {
- ((Lifecycle) result).start();
+ if (result instanceof BeanFactoryAware) {
+ ((BeanFactoryAware) result).setBeanFactory(beanFactory);
+ }
+ if (result instanceof InitializingBean) {
+ try {
+ ((InitializingBean) result).afterPropertiesSet();
+ }
+ catch (Exception e) {
+ throw new BeanInitializationException("failed to initialize annotated component", e);
+ }
+ }
+ if (result instanceof ApplicationListener) {
+ listeners.add((ApplicationListener) result);
}
}
}
@@ -158,4 +171,10 @@ public class MessagingAnnotationPostProcessor implements BeanPostProcessor, Bean
return name;
}
+ public void onApplicationEvent(ApplicationEvent event) {
+ for (ApplicationListener listener : listeners) {
+ listener.onApplicationEvent(event);
+ }
+ }
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java
index cf6e654fb2..8fa97b4988 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractConsumerEndpointParser.java
@@ -87,6 +87,7 @@ public abstract class AbstractConsumerEndpointParser extends AbstractSingleBeanD
@Override
protected final void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
+ IntegrationNamespaceUtils.registerTaskSchedulerIfNecessary(parserContext.getRegistry());
BeanDefinitionBuilder consumerBuilder = this.parseConsumer(element, parserContext);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, OUTPUT_CHANNEL_ATTRIBUTE);
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(consumerBuilder, element, SELECTOR_ATTRIBUTE);
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java
index 5caffe7cf8..98e52d242b 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java
@@ -23,6 +23,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.scheduling.IntervalTrigger;
+import org.springframework.integration.util.LifecycleSupport.AutoStartMode;
import org.springframework.util.Assert;
import org.springframework.util.xml.DomUtils;
@@ -52,6 +53,10 @@ public abstract class AbstractPollingInboundChannelAdapterParser extends Abstrac
else {
adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval()));
}
+ String autoStart = element.getAttribute("auto-startup");
+ if ("false".equals(autoStart)) {
+ adapterBuilder.addPropertyValue("autoStartMode", AutoStartMode.NONE);
+ }
return adapterBuilder.getBeanDefinition();
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java
index 61f66d1172..cb05d7adb0 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/IntegrationNamespaceUtils.java
@@ -23,11 +23,19 @@ import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.parsing.BeanComponentDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
+import org.springframework.beans.factory.support.BeanDefinitionRegistry;
+import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.BeanDefinitionParserDelegate;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.core.Conventions;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.integration.channel.MessagePublishingErrorHandler;
+import org.springframework.integration.channel.QueueChannel;
+import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.scheduling.CronTrigger;
import org.springframework.integration.scheduling.IntervalTrigger;
+import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.Assert;
@@ -195,4 +203,32 @@ public abstract class IntegrationNamespaceUtils {
targetBuilder.addPropertyValue("transactionDefinition", txDefinition);
}
+ /**
+ * Register a TaskScheduler in the given BeanDefinitionRegistry if not yet present.
+ * The bean name for which this is checking is defined by the constant
+ * {@link IntegrationContextUtils#TASK_SCHEDULER_BEAN_NAME}.
+ */
+ public static synchronized void registerTaskSchedulerIfNecessary(BeanDefinitionRegistry registry) {
+ if (!registry.containsBeanDefinition(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
+ RootBeanDefinition errorChannelDef = new RootBeanDefinition(QueueChannel.class);
+ BeanDefinitionHolder errorChannelHolder = new BeanDefinitionHolder(
+ errorChannelDef, IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
+ BeanDefinitionReaderUtils.registerBeanDefinition(errorChannelHolder, registry);
+ }
+ TaskExecutor taskExecutor = null;
+ if (!registry.containsBeanDefinition(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME)) {
+ taskExecutor = IntegrationContextUtils.createTaskExecutor(2, 100, 0, "integration-main-");
+ BeanDefinitionBuilder schedulerBuilder = BeanDefinitionBuilder.genericBeanDefinition(SimpleTaskScheduler.class);
+ schedulerBuilder.addConstructorArgValue(taskExecutor);
+ BeanDefinitionBuilder errorHandlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MessagePublishingErrorHandler.class);
+ errorHandlerBuilder.addPropertyReference("defaultErrorChannel", IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME);
+ String errorHandlerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName(
+ errorHandlerBuilder.getBeanDefinition(), registry);
+ schedulerBuilder.addPropertyReference("errorHandler", errorHandlerBeanName);
+ BeanDefinitionHolder schedulerHolder = new BeanDefinitionHolder(
+ schedulerBuilder.getBeanDefinition(), IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME);
+ BeanDefinitionReaderUtils.registerBeanDefinition(schedulerHolder, registry);
+ }
+ }
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java
index c0ac34f770..232f40e650 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/MessageBusParser.java
@@ -17,7 +17,6 @@
package org.springframework.integration.config.xml;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.w3c.dom.Element;
@@ -27,21 +26,14 @@ import org.springframework.beans.factory.config.BeanDefinitionHolder;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
-import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.AbstractSimpleBeanDefinitionParser;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.bus.ApplicationContextMessageBus;
-import org.springframework.integration.channel.MessagePublishingErrorHandler;
-import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
-import org.springframework.integration.scheduling.SimpleTaskScheduler;
-import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.Assert;
-import org.springframework.util.StringUtils;
+import org.springframework.integration.context.IntegrationContextUtils;
/**
* Parser for the <message-bus> element of the integration namespace.
@@ -51,35 +43,27 @@ import org.springframework.util.StringUtils;
*/
public class MessageBusParser extends AbstractSimpleBeanDefinitionParser {
- public static final String MESSAGE_BUS_BEAN_NAME = "internal.MessageBus";
-
- public static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
-
private static final String MESSAGING_ANNOTATION_POST_PROCESSOR_BEAN_NAME =
"internal.MessagingAnnotationPostProcessor";
- private static final String TASK_SCHEDULER_ATTRIBUTE = "task-scheduler";
-
private static final String ASYNC_EVENT_MULTICASTER_ATTRIBUTE = "configure-async-event-multicaster";
- @Override
- protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext)
- throws BeanDefinitionStoreException {
- Assert.state(!parserContext.getRegistry().containsBeanDefinition(MESSAGE_BUS_BEAN_NAME),
- "Only one Message Bus is allowed per ApplicationContext.");
- return MESSAGE_BUS_BEAN_NAME;
- }
-
@Override
protected Class> getBeanClass(Element element) {
return ApplicationContextMessageBus.class;
}
+ @Override
+ protected String resolveId(Element element, AbstractBeanDefinition definition, ParserContext parserContext)
+ throws BeanDefinitionStoreException {
+ return "messageBus";
+ }
+
+
@Override
protected boolean isEligibleAttribute(String attributeName) {
- return !TASK_SCHEDULER_ATTRIBUTE.equals(attributeName) &&
- !ASYNC_EVENT_MULTICASTER_ATTRIBUTE.equals(attributeName) &&
+ return !ASYNC_EVENT_MULTICASTER_ATTRIBUTE.equals(attributeName) &&
!"enable-annotations".equals(attributeName) &&
super.isEligibleAttribute(attributeName);
}
@@ -87,35 +71,9 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser {
@Override
protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
super.doParse(element, parserContext, builder);
- String taskSchedulerRef = element.getAttribute(TASK_SCHEDULER_ATTRIBUTE);
- TaskExecutor taskExecutor= null;
- if (!parserContext.getRegistry().containsBeanDefinition(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME)) {
- RootBeanDefinition errorChannelDef = new RootBeanDefinition(QueueChannel.class);
- BeanDefinitionHolder errorChannelHolder = new BeanDefinitionHolder(
- errorChannelDef, ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
- BeanDefinitionReaderUtils.registerBeanDefinition(errorChannelHolder, parserContext.getRegistry());
- }
- if (StringUtils.hasText(taskSchedulerRef)) {
- builder.addPropertyReference("taskScheduler", taskSchedulerRef);
- }
- else {
- taskExecutor = this.createTaskExecutor(2, 100, 0, "message-bus-");
- BeanDefinitionBuilder schedulerBuilder = BeanDefinitionBuilder.genericBeanDefinition(SimpleTaskScheduler.class);
- schedulerBuilder.addConstructorArgValue(taskExecutor);
- BeanDefinitionBuilder errorHandlerBuilder = BeanDefinitionBuilder.genericBeanDefinition(MessagePublishingErrorHandler.class);
- errorHandlerBuilder.addPropertyReference("defaultErrorChannel", ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
- String errorHandlerBeanName = BeanDefinitionReaderUtils.registerWithGeneratedName(
- errorHandlerBuilder.getBeanDefinition(), parserContext.getRegistry());
- schedulerBuilder.addPropertyReference("errorHandler", errorHandlerBeanName);
- BeanDefinitionHolder schedulerHolder = new BeanDefinitionHolder(
- schedulerBuilder.getBeanDefinition(), TASK_SCHEDULER_BEAN_NAME);
- BeanDefinitionReaderUtils.registerBeanDefinition(schedulerHolder, parserContext.getRegistry());
- builder.addPropertyReference("taskScheduler", TASK_SCHEDULER_BEAN_NAME);
- }
+ IntegrationNamespaceUtils.registerTaskSchedulerIfNecessary(parserContext.getRegistry());
if ("true".equals(element.getAttribute(ASYNC_EVENT_MULTICASTER_ATTRIBUTE).toLowerCase())) {
- if (taskExecutor == null) {
- taskExecutor = this.createTaskExecutor(1, 10, 0, "event-multicaster-");
- }
+ TaskExecutor taskExecutor = IntegrationContextUtils.createTaskExecutor(1, 10, 0, "event-multicaster-");
BeanDefinitionBuilder eventMulticasterBuilder = BeanDefinitionBuilder.genericBeanDefinition(
SimpleApplicationEventMulticaster.class);
eventMulticasterBuilder.addPropertyValue("taskExecutor", taskExecutor);
@@ -127,19 +85,6 @@ public class MessageBusParser extends AbstractSimpleBeanDefinitionParser {
this.addPostProcessors(element, parserContext);
}
- private TaskExecutor createTaskExecutor(int coreSize, int maxSize, int queueCapacity, String threadPrefix) {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(coreSize);
- executor.setMaxPoolSize(maxSize);
- executor.setQueueCapacity(queueCapacity);
- if (StringUtils.hasText(threadPrefix)) {
- executor.setThreadFactory(new CustomizableThreadFactory(threadPrefix));
- }
- executor.setRejectedExecutionHandler(new CallerRunsPolicy());
- executor.afterPropertiesSet();
- return executor;
- }
-
/**
* Adds extra post-processors to the context, to inject the objects configured by the MessageBus
*/
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd
index 216f8d3272..fd11a928f1 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/spring-integration-1.0.xsd
@@ -24,8 +24,6 @@
-
-
@@ -175,6 +173,7 @@
+
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java
new file mode 100644
index 0000000000..1a20798e95
--- /dev/null
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationContextUtils.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.context;
+
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
+
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.core.task.TaskExecutor;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+/**
+ * @author Mark Fisher
+ */
+public abstract class IntegrationContextUtils {
+
+ public static final String TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
+
+ public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel";
+
+
+ public static MessageChannel getErrorChannel(BeanFactory beanFactory) {
+ return getBeanOfType(beanFactory, ERROR_CHANNEL_BEAN_NAME, MessageChannel.class);
+ }
+
+ public static TaskScheduler getTaskScheduler(BeanFactory beanFactory) {
+ return getBeanOfType(beanFactory, TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class);
+ }
+
+ public static TaskScheduler getRequiredTaskScheduler(BeanFactory beanFactory) {
+ TaskScheduler taskScheduler = getTaskScheduler(beanFactory);
+ Assert.state(taskScheduler != null, "No such bean '" + TASK_SCHEDULER_BEAN_NAME + "'");
+ return taskScheduler;
+ }
+
+ @SuppressWarnings("unchecked")
+ private static T getBeanOfType(BeanFactory beanFactory, String beanName, Class type) {
+ if (!beanFactory.containsBean(beanName)) {
+ return null;
+ }
+ Object bean = beanFactory.getBean(beanName);
+ Assert.state(type.isAssignableFrom(bean.getClass()), "incorrect type for bean '" + beanName
+ + "' expected [" + type + "], but actual type is [" + bean.getClass() + "].");
+ return (T) bean;
+ }
+
+ public static TaskExecutor createTaskExecutor(int coreSize, int maxSize, int queueCapacity, String threadPrefix) {
+ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+ executor.setCorePoolSize(coreSize);
+ executor.setMaxPoolSize(maxSize);
+ executor.setQueueCapacity(queueCapacity);
+ if (StringUtils.hasText(threadPrefix)) {
+ executor.setThreadFactory(new CustomizableThreadFactory(threadPrefix));
+ }
+ executor.setRejectedExecutionHandler(new CallerRunsPolicy());
+ executor.afterPropertiesSet();
+ return executor;
+ }
+
+}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
index 820a3a2cc6..415730ebc3 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java
@@ -16,29 +16,58 @@
package org.springframework.integration.endpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.integration.context.IntegrationContextUtils;
+import org.springframework.integration.scheduling.TaskScheduler;
+import org.springframework.integration.util.LifecycleSupport;
+import org.springframework.util.Assert;
/**
* The base class for Message Endpoint implementations.
*
* @author Mark Fisher
*/
-public abstract class AbstractEndpoint implements MessageEndpoint, BeanNameAware {
+public abstract class AbstractEndpoint extends LifecycleSupport implements MessageEndpoint, BeanNameAware, BeanFactoryAware {
- protected final Log logger = LogFactory.getLog(this.getClass());
+ private volatile String beanName;
- private volatile String name;
+ private volatile BeanFactory beanFactory;
+
+ private volatile TaskScheduler taskScheduler;
- public void setBeanName(String name) {
- this.name = name;
+ public void setBeanName(String beanName) {
+ this.beanName = beanName;
}
+ // TODO: make this final (see TODO in GatewayProxyFactoryBean)
+ public void setBeanFactory(BeanFactory beanFactory) {
+ Assert.notNull(beanFactory, "beanFactory must not be null");
+ this.beanFactory = beanFactory;
+ TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory);
+ if (taskScheduler != null) {
+ this.setTaskScheduler(taskScheduler);
+ }
+ }
+
+ protected BeanFactory getBeanFactory() {
+ return this.beanFactory;
+ }
+
+ public void setTaskScheduler(TaskScheduler taskScheduler) {
+ Assert.notNull(taskScheduler, "taskScheduler must not be null");
+ this.taskScheduler = taskScheduler;
+ }
+
+ protected TaskScheduler getTaskScheduler() {
+ return this.taskScheduler;
+ }
+
+ @Override
public String toString() {
- return (this.name != null) ? this.name : super.toString();
+ return (this.beanName != null) ? this.beanName : super.toString();
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java
index 44fa623b5c..a20efe7631 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java
@@ -25,11 +25,8 @@ import org.aopalliance.aop.Advice;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.InitializingBean;
-import org.springframework.context.Lifecycle;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.scheduling.IntervalTrigger;
-import org.springframework.integration.scheduling.TaskScheduler;
-import org.springframework.integration.scheduling.TaskSchedulerAware;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
@@ -43,8 +40,7 @@ import org.springframework.util.ClassUtils;
/**
* @author Mark Fisher
*/
-public abstract class AbstractPollingEndpoint extends AbstractEndpoint
- implements TaskSchedulerAware, Lifecycle, InitializingBean, BeanClassLoaderAware {
+public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements InitializingBean, BeanClassLoaderAware {
public static final int MAX_MESSAGES_UNBOUNDED = -1;
@@ -65,15 +61,13 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint
private volatile ClassLoader classLoader = ClassUtils.getDefaultClassLoader();
- private volatile TaskScheduler taskScheduler;
-
private volatile ScheduledFuture> runningTask;
private volatile Runnable poller;
private volatile boolean initialized;
- private final Object lifecycleMonitor = new Object();
+ private final Object initializationMonitor = new Object();
public void setTrigger(Trigger trigger) {
@@ -93,10 +87,6 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
- public void setTaskScheduler(TaskScheduler taskScheduler) {
- this.taskScheduler = taskScheduler;
- }
-
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
@@ -126,13 +116,14 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint
private TransactionTemplate getTransactionTemplate() {
if (!this.initialized) {
- this.afterPropertiesSet();
+ this.onInit();
}
return this.transactionTemplate;
}
- public void afterPropertiesSet() {
- synchronized (this.lifecycleMonitor) {
+ @Override
+ protected void onInit() {
+ synchronized (this.initializationMonitor) {
if (this.initialized) {
return;
}
@@ -163,35 +154,24 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint
}
- // Lifecycle implementation
+ // LifecycleSupport implementation
- public boolean isRunning() {
- synchronized (this.lifecycleMonitor) {
- return this.runningTask != null;
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ if (!this.initialized) {
+ this.onInit();
}
+ Assert.state(this.getTaskScheduler() != null,
+ "unable to start polling, no taskScheduler available");
+ this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger);
}
- public void start() {
- synchronized (this.lifecycleMonitor) {
- if (!this.initialized) {
- this.afterPropertiesSet();
- }
- if (this.isRunning()) {
- return;
- }
- Assert.state(this.taskScheduler != null,
- "unable to start polling, no taskScheduler available");
- this.runningTask = this.taskScheduler.schedule(this.poller, this.trigger);
- }
- }
-
- public void stop() {
- synchronized (this.lifecycleMonitor) {
- if (this.runningTask != null) {
- this.runningTask.cancel(true);
- }
- this.runningTask = null;
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ if (this.runningTask != null) {
+ this.runningTask.cancel(true);
}
+ this.runningTask = null;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java
index d5fb69c36c..4f4d19dbae 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/EventDrivenConsumer.java
@@ -16,7 +16,6 @@
package org.springframework.integration.endpoint;
-import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.SubscribableChannel;
import org.springframework.integration.message.MessageHandler;
import org.springframework.util.Assert;
@@ -27,16 +26,12 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public class EventDrivenConsumer extends AbstractEndpoint implements Lifecycle {
+public class EventDrivenConsumer extends AbstractEndpoint {
private final SubscribableChannel inputChannel;
private final MessageHandler handler;
- private volatile boolean running;
-
- private final Object lifecycleMonitor = new Object();
-
public EventDrivenConsumer(SubscribableChannel inputChannel, MessageHandler handler) {
Assert.notNull(inputChannel, "inputChannel must not be null");
@@ -46,28 +41,14 @@ public class EventDrivenConsumer extends AbstractEndpoint implements Lifecycle {
}
- public boolean isRunning() {
- synchronized (this.lifecycleMonitor) {
- return this.running;
- }
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ this.inputChannel.subscribe(this.handler);
}
- public void start() {
- synchronized (this.lifecycleMonitor) {
- if (!this.running) {
- this.inputChannel.subscribe(this.handler);
- this.running = true;
- }
- }
- }
-
- public void stop() {
- synchronized (this.lifecycleMonitor) {
- if (this.running) {
- this.inputChannel.unsubscribe(this.handler);
- this.running = false;
- }
- }
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ this.inputChannel.unsubscribe(this.handler);
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java
index 0d70a98a04..4c7dd24abd 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java
@@ -16,7 +16,6 @@
package org.springframework.integration.endpoint;
-import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.channel.MessageChannelTemplate;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
@@ -28,7 +27,7 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public abstract class MessageProducerSupport extends AbstractEndpoint implements InitializingBean {
+public abstract class MessageProducerSupport extends AbstractEndpoint {
private volatile MessageChannel outputChannel;
@@ -43,7 +42,8 @@ public abstract class MessageProducerSupport extends AbstractEndpoint implements
this.channelTemplate.setSendTimeout(sendTimeout);
}
- public void afterPropertiesSet() {
+ @Override
+ protected void onInit() {
Assert.notNull(this.outputChannel, "outputChannel is required");
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java
index ae6c8adb21..a668ed7b7a 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java
@@ -59,15 +59,16 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint {
this.channelTemplate.setSendTimeout(sendTimeout);
}
- public void afterPropertiesSet() {
+ @Override
+ protected void onInit() {
Assert.notNull(this.source, "source must not be null");
Assert.notNull(this.outputChannel, "outputChannel must not be null");
- super.afterPropertiesSet();
if (this.maxMessagesPerPoll < 0) {
// the default is 1 since a source might return
// a non-null value every time it is invoked
this.setMaxMessagesPerPoll(1);
}
+ super.onInit();
}
@Override
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java
index 608aaa38f6..44fe7abb82 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/AbstractMessagingGateway.java
@@ -23,6 +23,7 @@ import org.springframework.integration.channel.SubscribableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessagingException;
+import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.MessageEndpoint;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -31,8 +32,6 @@ import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.message.ErrorMessage;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.message.MessageDeliveryException;
-import org.springframework.integration.scheduling.TaskScheduler;
-import org.springframework.integration.scheduling.TaskSchedulerAware;
import org.springframework.util.Assert;
/**
@@ -43,7 +42,7 @@ import org.springframework.util.Assert;
*
* @author Mark Fisher
*/
-public abstract class AbstractMessagingGateway implements MessagingGateway, MessageEndpoint, TaskSchedulerAware, Lifecycle {
+public abstract class AbstractMessagingGateway extends AbstractEndpoint implements MessagingGateway {
private volatile MessageChannel requestChannel;
@@ -53,16 +52,10 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess
private volatile boolean shouldThrowErrors = true;
- private volatile TaskScheduler taskScheduler;
-
private volatile MessageEndpoint replyMessageCorrelator;
private final Object replyMessageCorrelatorMonitor = new Object();
- private volatile boolean running;
-
- private final Object lifecycleMonitor = new Object();
-
/**
* Set the request channel.
@@ -113,10 +106,6 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess
this.shouldThrowErrors = shouldThrowErrors;
}
- public void setTaskScheduler(TaskScheduler taskScheduler) {
- this.taskScheduler = taskScheduler;
- }
-
public void send(Object object) {
Assert.state(this.requestChannel != null,
"send is not supported, because no request channel has been configured");
@@ -190,7 +179,7 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess
else if (this.replyChannel instanceof PollableChannel) {
PollingConsumer endpoint = new PollingConsumer(
(PollableChannel) this.replyChannel, handler);
- endpoint.setTaskScheduler(this.taskScheduler);
+ endpoint.setBeanFactory(this.getBeanFactory());
endpoint.afterPropertiesSet();
correlator = endpoint;
}
@@ -201,25 +190,17 @@ public abstract class AbstractMessagingGateway implements MessagingGateway, Mess
}
}
- public boolean isRunning() {
- return this.running;
- }
-
- public void start() {
- synchronized (this.lifecycleMonitor) {
- if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) {
- ((Lifecycle) this.replyMessageCorrelator).start();
- }
- this.running = true;
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) {
+ ((Lifecycle) this.replyMessageCorrelator).start();
}
}
- public void stop() {
- synchronized (this.lifecycleMonitor) {
- if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) {
- ((Lifecycle) this.replyMessageCorrelator).stop();
- }
- this.running = false;
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ if (this.replyMessageCorrelator != null && this.replyMessageCorrelator instanceof Lifecycle) {
+ ((Lifecycle) this.replyMessageCorrelator).stop();
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
index cf54263534..097c01fab2 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/gateway/GatewayProxyFactoryBean.java
@@ -37,11 +37,10 @@ import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.PollableChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.message.MethodParameterMessageMapper;
-import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;
@@ -52,8 +51,8 @@ import org.springframework.util.StringUtils;
*
* @author Mark Fisher
*/
-public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor, BeanClassLoaderAware, BeanFactoryAware,
- InitializingBean, Lifecycle {
+public class GatewayProxyFactoryBean extends AbstractEndpoint implements FactoryBean, MethodInterceptor, BeanClassLoaderAware, BeanFactoryAware,
+ InitializingBean {
private volatile Class> serviceInterface;
@@ -73,18 +72,12 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor,
private final Map gatewayMap = new HashMap();
- private volatile TaskScheduler taskScheduler;
-
private volatile ChannelResolver channelResolver;
private volatile boolean initialized;
- private volatile boolean running;
-
private final Object initializationMonitor = new Object();
- private final Object lifecycleMonitor = new Object();
-
public void setServiceInterface(Class> serviceInterface) {
if (serviceInterface != null && !serviceInterface.isInterface()) {
@@ -144,12 +137,14 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor,
this.beanClassLoader = beanClassLoader;
}
+ @Override // TODO: remove this and move channelResolver to parent class
public void setBeanFactory(BeanFactory beanFactory) {
- this.taskScheduler = (TaskScheduler) beanFactory.getBean(MessageBusParser.TASK_SCHEDULER_BEAN_NAME);
+ super.setBeanFactory(beanFactory);
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
}
- public void afterPropertiesSet() throws Exception {
+ @Override
+ protected void onInit() throws Exception {
synchronized (this.initializationMonitor) {
if (this.initialized) {
return;
@@ -226,7 +221,9 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor,
private MessagingGateway createGatewayForMethod(Method method) throws Exception {
SimpleMessagingGateway gateway = new SimpleMessagingGateway(
new MethodParameterMessageMapper(method), new SimpleMessageMapper());
- gateway.setTaskScheduler(this.taskScheduler);
+ if (this.getTaskScheduler() != null) {
+ gateway.setTaskScheduler(this.getTaskScheduler());
+ }
Gateway gatewayAnnotation = method.getAnnotation(Gateway.class);
MessageChannel requestChannel = this.defaultRequestChannel;
MessageChannel replyChannel = this.defaultReplyChannel;
@@ -251,37 +248,28 @@ public class GatewayProxyFactoryBean implements FactoryBean, MethodInterceptor,
gateway.setReplyChannel(replyChannel);
gateway.setRequestTimeout(requestTimeout);
gateway.setReplyTimeout(replyTimeout);
+ if (this.getBeanFactory() != null) {
+ gateway.setBeanFactory(this.getBeanFactory());
+ }
return gateway;
}
// Lifecycle implementation
- public boolean isRunning() {
- return this.running;
- }
-
- public void start() {
- synchronized (this.lifecycleMonitor) {
- if (!this.running) {
- for (MessagingGateway gateway : this.gatewayMap.values()) {
- if (gateway instanceof Lifecycle) {
- ((Lifecycle) gateway).start();
- }
- }
- this.running = true;
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ for (MessagingGateway gateway : this.gatewayMap.values()) {
+ if (gateway instanceof Lifecycle) {
+ ((Lifecycle) gateway).start();
}
}
}
- public void stop() {
- synchronized (this.lifecycleMonitor) {
- if (this.running) {
- for (MessagingGateway gateway : this.gatewayMap.values()) {
- if (gateway instanceof Lifecycle) {
- ((Lifecycle) gateway).stop();
- }
- }
- this.running = false;
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ for (MessagingGateway gateway : this.gatewayMap.values()) {
+ if (gateway instanceof Lifecycle) {
+ ((Lifecycle) gateway).stop();
}
}
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
index fbeeacc4be..e6d5f3c406 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
@@ -26,14 +26,18 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskExecutor;
+import org.springframework.integration.channel.BeanFactoryChannelResolver;
+import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.util.ErrorHandler;
+import org.springframework.integration.util.LifecycleSupport;
import org.springframework.scheduling.SchedulingException;
import org.springframework.util.Assert;
@@ -43,7 +47,7 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @author Marius Bogoevici
*/
-public class SimpleTaskScheduler implements TaskScheduler, DisposableBean {
+public class SimpleTaskScheduler extends LifecycleSupport implements TaskScheduler, BeanFactoryAware, DisposableBean {
private final Log logger = LogFactory.getLog(this.getClass());
@@ -57,14 +61,11 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean {
private final Set> executingTasks = Collections.synchronizedSet(new TreeSet>());
- private volatile boolean running;
-
- private final ReentrantLock lifecycleLock = new ReentrantLock();
-
public SimpleTaskScheduler(TaskExecutor executor) {
Assert.notNull(executor, "executor must not be null");
this.executor = executor;
+ this.setAutoStartMode(AutoStartMode.ON_CONTEXT_REFRESH);
}
@@ -72,7 +73,14 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean {
this.errorHandler = errorHandler;
}
+ public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
+ if (this.errorHandler == null) {
+ this.errorHandler = new MessagePublishingErrorHandler(new BeanFactoryChannelResolver(beanFactory));
+ }
+ }
+
public final ScheduledFuture> schedule(Runnable task, Trigger trigger) {
+ Assert.notNull(task, "task must not be null");
TriggeredTask triggeredTask = new TriggeredTask(task, trigger);
return this.schedule(triggeredTask, null, null);
}
@@ -87,56 +95,28 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean {
}
- // Lifecycle implementation
+ // LifecycleSupport implementation
- public boolean isRunning() {
- this.lifecycleLock.lock();
- try {
- return this.running;
- }
- finally {
- this.lifecycleLock.unlock();
- }
+ @Override // guarded by super#lifecycleLock
+ protected void doStart() {
+ this.executor.execute(this.schedulerTask = new SchedulerTask());
}
- public void start() {
- this.lifecycleLock.lock();
- try {
- if (this.running) {
- return;
- }
- this.running = true;
- this.executor.execute(this.schedulerTask = new SchedulerTask());
+ @Override // guarded by super#lifecycleLock
+ protected void doStop() {
+ this.schedulerTask.deactivate();
+ Thread executingThread = this.schedulerTask.executingThread.get();
+ if (executingThread != null) {
+ executingThread.interrupt();
}
- finally {
- this.lifecycleLock.unlock();
- }
- }
-
- public void stop() {
- this.lifecycleLock.lock();
- try {
- if (!this.running) {
- return;
+ this.scheduledTasks.clear();
+ synchronized (this.executingTasks) {
+ for (TriggeredTask> task : this.executingTasks) {
+ task.cancel(true);
}
- this.running = false;
- this.schedulerTask.deactivate();
- Thread executingThread = this.schedulerTask.executingThread.get();
- if (executingThread != null) {
- executingThread.interrupt();
- }
- this.scheduledTasks.clear();
- synchronized (this.executingTasks) {
- for (TriggeredTask> task : this.executingTasks) {
- task.cancel(true);
- }
- this.executingTasks.clear();
- }
- this.schedulerTask = null;
- }
- finally {
- this.lifecycleLock.unlock();
+ this.executingTasks.clear();
}
+ this.schedulerTask = null;
}
public void destroy() throws Exception {
@@ -236,7 +216,7 @@ public class SimpleTaskScheduler implements TaskScheduler, DisposableBean {
public long getDelay(TimeUnit unit) {
long now = new Date().getTime();
- long scheduled = this.scheduledTime.getTime();
+ long scheduled = (this.scheduledTime != null) ? this.scheduledTime.getTime() : now;
return (scheduled > now) ? unit.convert(scheduled - now, TimeUnit.MILLISECONDS) : 0;
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java
new file mode 100644
index 0000000000..64406d3305
--- /dev/null
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2002-2008 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.integration.util;
+
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.springframework.beans.factory.BeanInitializationException;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.Lifecycle;
+import org.springframework.context.event.ContextRefreshedEvent;
+
+/**
+ * A convenience base class for Lifecycle components that supports an
+ * "auto-startup" mode property. Depending on the mode, the component can
+ * be started either upon initialization, upon receiving the
+ * {@link ContextRefreshedEvent}, or may require an explicit start invocation.
+ * The timing of the startup is determined by the value of {@link #autoStartMode}.
+ * The default value is {@link AutoStartMode#ON_INIT}. To require explicit startup,
+ * set the mode to {@link AutoStartMode#NONE} using the
+ * {@link #setAutoStartMode(AutoStartMode)} method.
+ *
+ * @author Mark Fisher
+ */
+public abstract class LifecycleSupport implements Lifecycle, InitializingBean, ApplicationListener {
+
+ public static enum AutoStartMode { ON_INIT, ON_CONTEXT_REFRESH, NONE }
+
+
+ protected final Log logger = LogFactory.getLog(this.getClass());
+
+ private volatile AutoStartMode autoStartMode = AutoStartMode.ON_INIT;
+
+ private volatile boolean running;
+
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
+
+
+ public void setAutoStartMode(AutoStartMode autoStartMode) {
+ this.autoStartMode = (autoStartMode != null) ? autoStartMode : AutoStartMode.NONE;
+ }
+
+ public final void afterPropertiesSet() {
+ try {
+ this.onInit();
+ if (this.autoStartMode == AutoStartMode.ON_INIT) {
+ this.start();
+ }
+ }
+ catch (Exception e) {
+ throw new BeanInitializationException("failed to initialize", e);
+ }
+ }
+
+ public final void onApplicationEvent(ApplicationEvent event) {
+ this.onEvent(event);
+ if (event instanceof ContextRefreshedEvent && this.autoStartMode == AutoStartMode.ON_CONTEXT_REFRESH) {
+ this.start();
+ }
+ }
+
+ // Lifecycle implementation
+
+ public final boolean isRunning() {
+ this.lifecycleLock.lock();
+ try {
+ return this.running;
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ public final void start() {
+ this.lifecycleLock.lock();
+ try {
+ if (!this.running) {
+ this.doStart();
+ this.running = true;
+ if (logger.isInfoEnabled()) {
+ logger.info("started " + this);
+ }
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ public final void stop() {
+ this.lifecycleLock.lock();
+ try {
+ if (this.running) {
+ this.doStop();
+ this.running = false;
+ if (logger.isInfoEnabled()) {
+ logger.info("stopped " + this);
+ }
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ protected void onInit() throws Exception {
+ }
+
+ protected void onEvent(ApplicationEvent event) {
+ }
+
+ /**
+ * Subclasses must implement this method with the start behavior.
+ * This method will be invoked while holding the {@link #lifecycleLock}.
+ */
+ protected abstract void doStart();
+
+ /**
+ * Subclasses must implement this method with the stop behavior.
+ * This method will be invoked while holding the {@link #lifecycleLock}.
+ */
+ protected abstract void doStop();
+
+}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java
index d2cc9c4f53..02c4ef934c 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/ApplicationContextMessageBusTests.java
@@ -26,21 +26,18 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.springframework.beans.factory.BeanCreationException;
-import org.springframework.context.Lifecycle;
import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.PublishSubscribeChannel;
import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
+import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
+import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
-import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.message.ErrorMessage;
@@ -49,8 +46,8 @@ import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.MessageSource;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
-import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -59,13 +56,11 @@ public class ApplicationContextMessageBusTests {
@Test
public void endpointRegistrationWithInputChannelReference() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel sourceChannel = new QueueChannel();
QueueChannel targetChannel = new QueueChannel();
- sourceChannel.setBeanName("sourceChannel");
- targetChannel.setBeanName("targetChannel");
- context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
- context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
+ context.registerChannel("sourceChannel", sourceChannel);
+ context.registerChannel("targetChannel", targetChannel);
Message message = MessageBuilder.withPayload("test")
.setReplyChannelName("targetChannel").build();
sourceChannel.send(message);
@@ -76,37 +71,25 @@ public class ApplicationContextMessageBusTests {
};
handler.setBeanFactory(context);
PollingConsumer endpoint = new PollingConsumer(sourceChannel, handler);
- endpoint.afterPropertiesSet();
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
- bus.setApplicationContext(context);
- bus.start();
Message> result = targetChannel.receive(3000);
assertEquals("test", result.getPayload());
- bus.stop();
+ context.stop();
}
@Test
public void channelsWithoutHandlers() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel sourceChannel = new QueueChannel();
- sourceChannel.setBeanName("sourceChannel");
- context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
+ context.registerChannel("sourceChannel", sourceChannel);
sourceChannel.send(new StringMessage("test"));
QueueChannel targetChannel = new QueueChannel();
- targetChannel.setBeanName("targetChannel");
- context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
+ context.registerChannel("targetChannel", targetChannel);
context.refresh();
- bus.start();
Message> result = targetChannel.receive(100);
assertNull(result);
- bus.stop();
+ context.stop();
}
@Test
@@ -116,15 +99,13 @@ public class ApplicationContextMessageBusTests {
PollableChannel sourceChannel = (PollableChannel) context.getBean("sourceChannel");
sourceChannel.send(new GenericMessage("test"));
PollableChannel targetChannel = (PollableChannel) context.getBean("targetChannel");
- Lifecycle bus = (Lifecycle) context.getBean("bus");
- bus.start();
Message> result = targetChannel.receive(1000);
assertEquals("test", result.getPayload());
}
@Test
public void exactlyOneConsumerReceivesPointToPointMessage() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -140,35 +121,26 @@ public class ApplicationContextMessageBusTests {
replyHolder.set(message);
}
};
- inputChannel.setBeanName("input");
- outputChannel1.setBeanName("output1");
- outputChannel2.setBeanName("output2");
- context.getBeanFactory().registerSingleton("input", inputChannel);
- context.getBeanFactory().registerSingleton("output1", outputChannel1);
- context.getBeanFactory().registerSingleton("output2", outputChannel2);
+ context.registerChannel("input", inputChannel);
+ context.registerChannel("output1", outputChannel1);
+ context.registerChannel("output2", outputChannel2);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
PollingConsumer endpoint1 = new PollingConsumer(inputChannel, handler1);
- endpoint1.afterPropertiesSet();
PollingConsumer endpoint2 = new PollingConsumer(inputChannel, handler2);
- endpoint2.afterPropertiesSet();
- context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
- context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint1", endpoint1);
+ context.registerEndpoint("testEndpoint2", endpoint2);
context.refresh();
- bus.start();
inputChannel.send(new StringMessage("testing"));
Message> message1 = outputChannel1.receive(500);
Message> message2 = outputChannel2.receive(0);
- bus.stop();
+ context.stop();
assertTrue("exactly one message should be null", message1 == null ^ message2 == null);
}
@Test
public void bothConsumersReceivePublishSubscribeMessage() throws InterruptedException {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
PublishSubscribeChannel inputChannel = new PublishSubscribeChannel();
QueueChannel outputChannel1 = new QueueChannel();
QueueChannel outputChannel2 = new QueueChannel();
@@ -187,60 +159,45 @@ public class ApplicationContextMessageBusTests {
latch.countDown();
}
};
- inputChannel.setBeanName("input");
- outputChannel1.setBeanName("output1");
- outputChannel2.setBeanName("output2");
- context.getBeanFactory().registerSingleton("input", inputChannel);
- context.getBeanFactory().registerSingleton("output1", outputChannel1);
- context.getBeanFactory().registerSingleton("output2", outputChannel2);
+ context.registerChannel("input", inputChannel);
+ context.registerChannel("output1", outputChannel1);
+ context.registerChannel("output2", outputChannel2);
handler1.setOutputChannel(outputChannel1);
handler2.setOutputChannel(outputChannel2);
EventDrivenConsumer endpoint1 = new EventDrivenConsumer(inputChannel, handler1);
EventDrivenConsumer endpoint2 = new EventDrivenConsumer(inputChannel, handler2);
- context.getBeanFactory().registerSingleton("testEndpoint1", endpoint1);
- context.getBeanFactory().registerSingleton("testEndpoint2", endpoint2);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint1", endpoint1);
+ context.registerEndpoint("testEndpoint2", endpoint2);
context.refresh();
- bus.start();
inputChannel.send(new StringMessage("testing"));
latch.await(500, TimeUnit.MILLISECONDS);
assertEquals("both handlers should have been invoked", 0, latch.getCount());
Message> message1 = outputChannel1.receive(500);
Message> message2 = outputChannel2.receive(500);
- bus.stop();
+ context.stop();
assertNotNull("both handlers should have replied to the message", message1);
assertNotNull("both handlers should have replied to the message", message2);
}
@Test
public void errorChannelWithFailedDispatch() throws InterruptedException {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel errorChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- errorChannel.setBeanName("errorChannel");
- context.getBeanFactory().registerSingleton("errorChannel", errorChannel);
+ context.registerChannel("errorChannel", errorChannel);
CountDownLatch latch = new CountDownLatch(1);
SourcePollingChannelAdapter channelAdapter = new SourcePollingChannelAdapter();
channelAdapter.setSource(new FailingSource(latch));
channelAdapter.setTrigger(new IntervalTrigger(1000));
channelAdapter.setOutputChannel(outputChannel);
- channelAdapter.setBeanName("testChannel");
- context.getBeanFactory().registerSingleton("testChannel", channelAdapter);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- SimpleTaskScheduler taskScheduler = (SimpleTaskScheduler) TestUtils.createTaskScheduler(10);
+ context.registerEndpoint("testChannel", channelAdapter);
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
MessagePublishingErrorHandler errorHandler = new MessagePublishingErrorHandler(channelResolver);
errorHandler.setDefaultErrorChannel(errorChannel);
- taskScheduler.setErrorHandler(errorHandler);
- bus.setTaskScheduler(taskScheduler);
- bus.setApplicationContext(context);
context.refresh();
- bus.start();
latch.await(2000, TimeUnit.MILLISECONDS);
Message> message = errorChannel.receive(5000);
- bus.stop();
+ context.stop();
assertNull(outputChannel.receive(0));
assertNotNull("message should not be null", message);
assertTrue(message instanceof ErrorMessage);
@@ -248,17 +205,11 @@ public class ApplicationContextMessageBusTests {
assertEquals("intentional test failure", exception.getMessage());
}
- @Test(expected = BeanCreationException.class)
- public void multipleMessageBusBeans() {
- new ClassPathXmlApplicationContext("multipleMessageBusBeans.xml", this.getClass());
- }
-
@Test
public void consumerSubscribedToErrorChannel() throws InterruptedException {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel errorChannel = new QueueChannel();
- errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
- context.getBeanFactory().registerSingleton(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel);
+ context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
final CountDownLatch latch = new CountDownLatch(1);
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
@@ -267,16 +218,12 @@ public class ApplicationContextMessageBusTests {
}
};
PollingConsumer endpoint = new PollingConsumer(errorChannel, handler);
- endpoint.afterPropertiesSet();
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- bus.start();
errorChannel.send(new ErrorMessage(new RuntimeException("test-exception")));
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals("handler should have received error message", 0, latch.getCount());
+ context.stop();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java
index b7428d06f9..16f1039a5c 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/DirectChannelSubscriptionTests.java
@@ -21,14 +21,13 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.channel.ThreadLocalChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
-import org.springframework.integration.config.xml.MessageBusParser;
+import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -37,15 +36,14 @@ import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.handler.ServiceActivatingHandler;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class DirectChannelSubscriptionTests {
- private GenericApplicationContext context = new GenericApplicationContext();
-
- private ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
+ private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel sourceChannel = new DirectChannel();
@@ -54,30 +52,23 @@ public class DirectChannelSubscriptionTests {
@Before
public void setupChannels() {
- sourceChannel.setBeanName("sourceChannel");
- targetChannel.setBeanName("targetChannel");
- context.getBeanFactory().registerSingleton("sourceChannel", sourceChannel);
- context.getBeanFactory().registerSingleton("targetChannel", targetChannel);
- context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, bus);
- bus.setApplicationContext(context);
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
+ context.registerChannel("sourceChannel", sourceChannel);
+ context.registerChannel("targetChannel", targetChannel);
}
@Test
public void sendAndReceiveForRegisteredEndpoint() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
ServiceActivatingHandler serviceActivator = new ServiceActivatingHandler(new TestBean(), "handle");
serviceActivator.setOutputChannel(targetChannel);
EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, serviceActivator);
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message> response = this.targetChannel.receive();
assertEquals("foo!", response.getPayload());
- bus.stop();
+ context.stop();
}
@Test
@@ -88,11 +79,10 @@ public class DirectChannelSubscriptionTests {
TestEndpoint endpoint = new TestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
- bus.start();
this.sourceChannel.send(new StringMessage("foo"));
Message> response = this.targetChannel.receive();
assertEquals("foo-from-annotated-endpoint", response.getPayload());
- bus.stop();
+ context.stop();
}
@Test(expected = MessagingException.class)
@@ -105,27 +95,32 @@ public class DirectChannelSubscriptionTests {
};
handler.setOutputChannel(targetChannel);
EventDrivenConsumer endpoint = new EventDrivenConsumer(sourceChannel, handler);
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- bus.start();
- this.sourceChannel.send(new StringMessage("foo"));
+ try {
+ this.sourceChannel.send(new StringMessage("foo"));
+ }
+ finally {
+ context.stop();
+ }
}
@Test(expected = MessagingException.class)
public void exceptionThrownFromAnnotatedEndpoint() {
QueueChannel errorChannel = new QueueChannel();
- errorChannel.setBeanName(ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME);
- context.getBeanFactory().registerSingleton(
- ApplicationContextMessageBus.ERROR_CHANNEL_BEAN_NAME, errorChannel);
+ context.registerChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME, errorChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
FailingTestEndpoint endpoint = new FailingTestEndpoint();
postProcessor.postProcessAfterInitialization(endpoint, "testEndpoint");
context.refresh();
- bus.start();
- this.sourceChannel.send(new StringMessage("foo"));
+ try {
+ this.sourceChannel.send(new StringMessage("foo"));
+ }
+ finally {
+ context.stop();
+ }
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java
deleted file mode 100644
index 037196c959..0000000000
--- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/MessageBusEventTests.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright 2002-2008 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.springframework.integration.bus;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-import org.junit.Test;
-
-import org.springframework.beans.factory.support.RootBeanDefinition;
-import org.springframework.context.ApplicationEvent;
-import org.springframework.context.ApplicationListener;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.integration.config.xml.MessageBusParser;
-import org.springframework.integration.util.TestUtils;
-
-/**
- * @author Marius Bogoevici
- * @author Mark Fisher
- */
-public class MessageBusEventTests {
-
- @Test
- public void messageBusStartedEvent() {
- GenericApplicationContext context = new GenericApplicationContext();
- context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class));
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
- context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
- assertNull(listener.startedBus);
- assertNull(listener.stoppedBus);
- context.refresh(); // bus will start
- assertNotNull(listener.startedBus);
- assertNull(listener.stoppedBus);
- assertEquals(messageBus, listener.startedBus);
- }
-
- @Test
- public void messageBusStoppedEvent() {
- GenericApplicationContext context = new GenericApplicationContext();
- context.registerBeanDefinition("listener", new RootBeanDefinition(TestMessageBusEventListener.class));
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
- context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
- assertNull(listener.startedBus);
- assertNull(listener.stoppedBus);
- context.refresh();
- assertNotNull(listener.startedBus);
- assertNull(listener.stoppedBus);
- messageBus.stop();
- assertNotNull(listener.stoppedBus);
- assertEquals(messageBus, listener.stoppedBus);
- }
-
-
- public static class TestMessageBusEventListener implements ApplicationListener {
-
- private volatile ApplicationContextMessageBus startedBus;
-
- private volatile ApplicationContextMessageBus stoppedBus;
-
-
- public ApplicationContextMessageBus getStartedBus() {
- return this.startedBus;
- }
-
- public ApplicationContextMessageBus getStoppedBus() {
- return this.stoppedBus;
- }
-
- public void onApplicationEvent(ApplicationEvent event) {
- if (event instanceof MessageBusStartedEvent) {
- this.startedBus = (ApplicationContextMessageBus) event.getSource();
- }
- if (event instanceof MessageBusStoppedEvent) {
- this.stoppedBus = (ApplicationContextMessageBus) event.getSource();
- }
- }
- }
-
-}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml
index bbd695a2ba..37e106e047 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/bus/messageBusTests.xml
@@ -4,13 +4,9 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
-
-
-
-
-
-
+
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml b/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml
deleted file mode 100644
index 2e6ab1b168..0000000000
--- a/org.springframework.integration/src/test/java/org/springframework/integration/bus/multipleMessageBusBeans.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-
-
-
-
-
-
-
-
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java
index cef324ca90..eb8dfe68f5 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/channel/MessageChannelTemplateTests.java
@@ -26,11 +26,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.PollingConsumer;
@@ -39,19 +38,22 @@ import org.springframework.integration.handler.ReplyMessageHolder;
import org.springframework.integration.message.MessageBuilder;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class MessageChannelTemplateTests {
+ private TestApplicationContext context = TestUtils.createTestApplicationContext();
+
private QueueChannel requestChannel;
@Before
public void setUp() {
this.requestChannel = new QueueChannel();
- this.requestChannel.setBeanName("requestChannel");
+ context.registerChannel("requestChannel", requestChannel);
AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() {
@Override
public void handleRequestMessage(Message> message, ReplyMessageHolder replyHolder) {
@@ -59,17 +61,14 @@ public class MessageChannelTemplateTests {
}
};
PollingConsumer endpoint = new PollingConsumer(requestChannel, handler);
- endpoint.afterPropertiesSet();
- GenericApplicationContext context = new GenericApplicationContext();
- context.getBeanFactory().registerSingleton("requestChannel", requestChannel);
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- bus.start();
}
+ @After
+ public void tearDown() {
+ context.stop();
+ }
@Test
public void send() {
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml
index bc016f29ef..7b16df9763 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests-context.xml
@@ -7,7 +7,7 @@
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-1.0.xsd">
-
+
@@ -17,7 +17,7 @@
-
+
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java
index 5af6aefa5d..e1a47776eb 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ChannelAdapterParserTests.java
@@ -21,18 +21,19 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
-import org.springframework.context.Lifecycle;
+import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolutionException;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PollableChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
-import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.endpoint.EventDrivenConsumer;
+import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.message.StringMessage;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
@@ -43,13 +44,22 @@ import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests;
@ContextConfiguration
public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests {
+ @Before
+ public void startContext() {
+ ((AbstractApplicationContext) this.applicationContext).start();
+ }
+
+ @After
+ public void stopContext() {
+ ((AbstractApplicationContext) this.applicationContext).stop();
+ }
+
+
@Test
public void targetOnly() {
String beanName = "outboundWithImplicitChannel";
Object channel = this.applicationContext.getBean(beanName);
assertTrue(channel instanceof DirectChannel);
- Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- bus.start();
BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext);
assertNotNull(channelResolver.resolveChannelName(beanName));
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
@@ -61,7 +71,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
assertTrue(((MessageChannel) channel).send(message));
assertNotNull(consumer.getLastMessage());
assertEquals(message, consumer.getLastMessage());
- bus.stop();
}
@Test
@@ -69,8 +78,6 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
String beanName = "methodInvokingConsumer";
Object channel = this.applicationContext.getBean(beanName);
assertTrue(channel instanceof DirectChannel);
- Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- bus.start();
BeanFactoryChannelResolver channelResolver = new BeanFactoryChannelResolver(this.applicationContext);
assertNotNull(channelResolver.resolveChannelName(beanName));
Object adapter = this.applicationContext.getBean(beanName + ".adapter");
@@ -82,24 +89,20 @@ public class ChannelAdapterParserTests extends AbstractJUnit4SpringContextTests
assertTrue(((MessageChannel) channel).send(message));
assertNotNull(testBean.getMessage());
assertEquals("consumer test", testBean.getMessage());
- bus.stop();
}
@Test
public void methodInvokingSource() {
String beanName = "methodInvokingSource";
PollableChannel channel = (PollableChannel) this.applicationContext.getBean("queueChannel");
- Lifecycle bus = (Lifecycle) this.applicationContext.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- bus.start();
+ TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
+ testBean.store("source test");
Object adapter = this.applicationContext.getBean(beanName);
assertNotNull(adapter);
assertTrue(adapter instanceof SourcePollingChannelAdapter);
- TestBean testBean = (TestBean) this.applicationContext.getBean("testBean");
- testBean.store("source test");
Message> message = channel.receive(1000);
assertNotNull(message);
assertEquals("source test", testBean.getMessage());
- bus.stop();
}
@Test(expected = ChannelResolutionException.class)
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java
index 7c106caacb..4d581fac71 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/MessageBusParserTests.java
@@ -17,25 +17,16 @@
package org.springframework.integration.config;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
-import org.springframework.beans.factory.BeanCreationException;
-import org.springframework.beans.factory.BeanDefinitionStoreException;
import org.springframework.context.ApplicationContext;
-import org.springframework.context.Lifecycle;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.core.task.SyncTaskExecutor;
-import org.springframework.integration.bus.MessageBusEventTests.TestMessageBusEventListener;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
-import org.springframework.integration.config.xml.MessageBusParser;
-import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
@@ -60,42 +51,6 @@ public class MessageBusParserTests {
assertEquals(context.getBean("errorChannel"), resolver.resolveChannelName("errorChannel"));
}
- @Test
- public void testMultipleMessageBusElements() {
- boolean exceptionThrown = false;
- try {
- new ClassPathXmlApplicationContext("multipleMessageBusElements.xml", this.getClass());
- }
- catch (BeanDefinitionStoreException e) {
- exceptionThrown = true;
- assertEquals(IllegalStateException.class, e.getCause().getClass());
- }
- assertTrue(exceptionThrown);
- }
-
- @Test
- public void testMessageBusElementAndBean() {
- boolean exceptionThrown = false;
- try {
- new ClassPathXmlApplicationContext("messageBusElementAndBean.xml", this.getClass());
- }
- catch (BeanCreationException e) {
- exceptionThrown = true;
- assertEquals(IllegalStateException.class, e.getCause().getClass());
- assertEquals(e.getBeanName(), MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- }
- assertTrue(exceptionThrown);
- }
-
- @Test
- public void testAutoStartup() {
- ApplicationContext context = new ClassPathXmlApplicationContext(
- "messageBusWithAutoStartup.xml", this.getClass());
- Lifecycle bus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- assertTrue(bus.isRunning());
- bus.stop();
- }
-
@Test
public void testMulticasterIsSyncByDefault() {
ApplicationContext context = new ClassPathXmlApplicationContext(
@@ -131,42 +86,4 @@ public class MessageBusParserTests {
assertEquals(ThreadPoolTaskExecutor.class, taskExecutor.getClass());
}
- @Test
- public void testMessageBusEventListenerReceivesStartedEvent() {
- ApplicationContext context = new ClassPathXmlApplicationContext(
- "messageBusWithListener.xml", this.getClass());
- Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
- assertNull(listener.getStartedBus());
- assertNull(listener.getStoppedBus());
- messageBus.start();
- assertNotNull(listener.getStartedBus());
- assertEquals(messageBus, listener.getStartedBus());
- assertNull(listener.getStoppedBus());
- }
-
- @Test
- public void testMessageBusEventListenerReceivesStoppedEvent() {
- ApplicationContext context = new ClassPathXmlApplicationContext(
- "messageBusWithListener.xml", this.getClass());
- Lifecycle messageBus = (Lifecycle) context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- TestMessageBusEventListener listener = (TestMessageBusEventListener) context.getBean("listener");
- assertNull(listener.getStoppedBus());
- messageBus.start();
- messageBus.stop();
- assertNotNull(listener.getStoppedBus());
- assertEquals(messageBus, listener.getStoppedBus());
- }
-
- @Test
- public void testMessageBusWithTaskScheduler() {
- ApplicationContext context = new ClassPathXmlApplicationContext(
- "messageBusWithTaskScheduler.xml", this.getClass());
- Object messageBus = context.getBean(MessageBusParser.MESSAGE_BUS_BEAN_NAME);
- StubTaskScheduler schedulerBean = (StubTaskScheduler) context.getBean("testScheduler");
- TaskScheduler busScheduler = (TaskScheduler) new DirectFieldAccessor(messageBus).getPropertyValue("taskScheduler");
- assertNotNull(busScheduler);
- assertEquals(schedulerBean, busScheduler);
- }
-
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java
index a96276406f..21b3be34fe 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/ServiceActivatorAnnotationPostProcessorTests.java
@@ -25,16 +25,14 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.beans.factory.support.RootBeanDefinition;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.annotation.MessagingAnnotationPostProcessor;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -44,19 +42,14 @@ public class ServiceActivatorAnnotationPostProcessorTests {
@Test
public void testAnnotatedMethod() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
+ RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
+ context.registerBeanDefinition("postProcessor", postProcessorDef);
context.registerBeanDefinition("testChannel", new RootBeanDefinition(QueueChannel.class));
RootBeanDefinition beanDefinition = new RootBeanDefinition(SimpleServiceActivatorAnnotationTestBean.class);
beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(latch);
context.registerBeanDefinition("testBean", beanDefinition);
- String busBeanName = MessageBusParser.MESSAGE_BUS_BEAN_NAME;
- RootBeanDefinition busBeanDefinition = new RootBeanDefinition(ApplicationContextMessageBus.class);
- busBeanDefinition.getPropertyValues().addPropertyValue("taskScheduler", TestUtils.createTaskScheduler(10));
- context.registerBeanDefinition(busBeanName, busBeanDefinition);
- RootBeanDefinition postProcessorDef = new RootBeanDefinition(MessagingAnnotationPostProcessor.class);
- context.registerBeanDefinition("postProcessor", postProcessorDef);
context.refresh();
- context.start();
SimpleServiceActivatorAnnotationTestBean testBean = (SimpleServiceActivatorAnnotationTestBean) context.getBean("testBean");
assertEquals(1, latch.getCount());
assertNull(testBean.getMessageText());
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java
index f1e2f03a1a..d98b5e1ce8 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java
@@ -26,9 +26,9 @@ import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
@@ -55,7 +55,7 @@ public class AnnotatedEndpointActivationTests {
private PollableChannel output;
@Autowired
- private ApplicationContextMessageBus messageBus;
+ private AbstractApplicationContext applicationContext;
// This has to be static because the MessageBus registers the handler
// more than once (every time a test instance is created), but only one of
@@ -90,15 +90,15 @@ public class AnnotatedEndpointActivationTests {
}
@Test(expected = MessageDeliveryException.class)
- public void stopMessageBus() {
- messageBus.stop();
+ public void stopContext() {
+ applicationContext.stop();
this.input.send(new GenericMessage("foo"));
}
@Test
- public void stopAndRestartMessageBus() {
- messageBus.stop();
- messageBus.start();
+ public void stopAndRestartContext() {
+ applicationContext.stop();
+ applicationContext.start();
this.input.send(new GenericMessage("foo"));
Message> message = this.output.receive(100);
assertNotNull(message);
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java
index 2205b3c724..79291500e0 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationPostProcessorTests.java
@@ -28,22 +28,18 @@ import org.junit.Test;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.DirectFieldAccessor;
-import org.springframework.beans.factory.NoSuchBeanDefinitionException;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.ChannelAdapter;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.PollingConsumer;
@@ -53,6 +49,7 @@ import org.springframework.integration.message.StringMessage;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -61,14 +58,9 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void serviceActivatorAnnotation() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -131,12 +123,7 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void targetAnnotation() throws InterruptedException {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -144,14 +131,13 @@ public class MessagingAnnotationPostProcessorTests {
OutboundChannelAdapterTestBean testBean = new OutboundChannelAdapterTestBean(latch);
postProcessor.postProcessAfterInitialization(testBean, "testBean");
context.refresh();
- messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
MessageChannel testChannel = channelResolver.resolveChannelName("testChannel");
testChannel.send(new StringMessage("foo"));
latch.await(1000, TimeUnit.MILLISECONDS);
assertEquals(0, latch.getCount());
assertEquals("foo", testBean.getMessageText());
- messageBus.stop();
+ context.stop();
}
@Test(expected = IllegalArgumentException.class)
@@ -161,29 +147,13 @@ public class MessagingAnnotationPostProcessorTests {
postProcessor.afterPropertiesSet();
}
- @Test(expected = NoSuchBeanDefinitionException.class)
- public void testPostProcessorWithoutMessageBus() {
- GenericApplicationContext context = new GenericApplicationContext();
- MessagingAnnotationPostProcessor postProcessor =
- new MessagingAnnotationPostProcessor();
- postProcessor.setBeanFactory(context.getBeanFactory());
- postProcessor.afterPropertiesSet();
- }
-
@Test
public void testChannelResolution() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
DirectChannel inputChannel = new DirectChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -195,22 +165,16 @@ public class MessagingAnnotationPostProcessorTests {
inputChannel.send(message);
Message> reply = outputChannel.receive(0);
assertNotNull(reply);
+ context.stop();
}
@Test
public void testProxiedMessageEndpointAnnotation() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -218,53 +182,37 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("world"));
Message> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testMessageEndpointAnnotationInherited() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointSubclass(), "subclass");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("world"));
Message> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedWithProxy() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -272,79 +220,55 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("world"));
Message> message = outputChannel.receive(1000);
assertEquals("hello world", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterface() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithAutoCreatedChannels() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
postProcessor.postProcessAfterInitialization(new SimpleAnnotatedEndpointImplementation(), "impl");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testMessageEndpointAnnotationInheritedFromInterfaceWithProxy() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel inputChannel = new QueueChannel();
QueueChannel outputChannel = new QueueChannel();
- inputChannel.setBeanName("inputChannel");
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("inputChannel", inputChannel);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -352,23 +276,17 @@ public class MessagingAnnotationPostProcessorTests {
Object proxy = proxyFactory.getProxy();
postProcessor.postProcessAfterInitialization(proxy, "proxy");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("ABC"));
Message> message = outputChannel.receive(1000);
assertEquals("test-ABC", message.getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testEndpointWithPollerAnnotation() {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
QueueChannel testChannel = new QueueChannel();
- testChannel.setBeanName("testChannel");
- context.getBeanFactory().registerSingleton("testChannel", testChannel);
- messageBus.setApplicationContext(context);
+ context.registerChannel("testChannel", testChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -385,19 +303,12 @@ public class MessagingAnnotationPostProcessorTests {
@Test
public void testChannelAdapterAnnotation() throws InterruptedException {
- GenericApplicationContext context = new GenericApplicationContext();
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
ChannelAdapterAnnotationTestBean testBean = new ChannelAdapterAnnotationTestBean();
postProcessor.postProcessAfterInitialization(testBean, "testBean");
- context.refresh();
- messageBus.start();
ChannelResolver channelResolver = new BeanFactoryChannelResolver(context);
DirectChannel testChannel = (DirectChannel) channelResolver.resolveChannelName("testChannel");
final CountDownLatch latch = new CountDownLatch(1);
@@ -408,26 +319,21 @@ public class MessagingAnnotationPostProcessorTests {
latch.countDown();
}
});
+ context.refresh();
latch.await(3, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
assertNotNull(receivedMessage.get());
assertEquals("test", receivedMessage.get().getPayload());
- messageBus.stop();
+ context.stop();
}
@Test
public void testTransformer() {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
DirectChannel inputChannel = new DirectChannel();
- inputChannel.setBeanName("inputChannel");
- context.getBeanFactory().registerSingleton("inputChannel", inputChannel);
+ context.registerChannel("inputChannel", inputChannel);
QueueChannel outputChannel = new QueueChannel();
- outputChannel.setBeanName("outputChannel");
- context.getBeanFactory().registerSingleton("outputChannel", outputChannel);
- ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
- context.getBeanFactory().registerSingleton(MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("outputChannel", outputChannel);
MessagingAnnotationPostProcessor postProcessor = new MessagingAnnotationPostProcessor();
postProcessor.setBeanFactory(context.getBeanFactory());
postProcessor.afterPropertiesSet();
@@ -437,6 +343,7 @@ public class MessagingAnnotationPostProcessorTests {
inputChannel.send(new StringMessage("foo"));
Message> reply = outputChannel.receive(0);
assertEquals("FOO", reply.getPayload());
+ context.stop();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java
index 1e8624c7f7..18dcae80da 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/RouterAnnotationPostProcessorTests.java
@@ -21,25 +21,21 @@ import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Test;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Router;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class RouterAnnotationPostProcessorTests {
- private GenericApplicationContext context = new GenericApplicationContext();
-
- private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
+ private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel inputChannel = new DirectChannel();
@@ -48,14 +44,8 @@ public class RouterAnnotationPostProcessorTests {
@Before
public void init() {
- messageBus.setApplicationContext(context);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- inputChannel.setBeanName("input");
- outputChannel.setBeanName("output");
- context.getBeanFactory().registerSingleton("input", inputChannel);
- context.getBeanFactory().registerSingleton("output", outputChannel);
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
+ context.registerChannel("input", inputChannel);
+ context.registerChannel("output", outputChannel);
}
@@ -67,10 +57,10 @@ public class RouterAnnotationPostProcessorTests {
TestRouter testRouter = new TestRouter();
postProcessor.postProcessAfterInitialization(testRouter, "test");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("foo"));
Message> replyMessage = outputChannel.receive(0);
assertEquals("foo", replyMessage.getPayload());
+ context.stop();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java
index ba56ee9fc0..2d2736653b 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/config/annotation/SplitterAnnotationPostProcessorTests.java
@@ -23,25 +23,21 @@ import static org.junit.Assert.assertNull;
import org.junit.Before;
import org.junit.Test;
-import org.springframework.context.support.GenericApplicationContext;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Splitter;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
-import org.springframework.integration.config.xml.MessageBusParser;
import org.springframework.integration.core.Message;
import org.springframework.integration.message.StringMessage;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
*/
public class SplitterAnnotationPostProcessorTests {
- private GenericApplicationContext context = new GenericApplicationContext();
-
- private ApplicationContextMessageBus messageBus = new ApplicationContextMessageBus();
+ private TestApplicationContext context = TestUtils.createTestApplicationContext();
private DirectChannel inputChannel = new DirectChannel();
@@ -50,14 +46,8 @@ public class SplitterAnnotationPostProcessorTests {
@Before
public void init() {
- inputChannel.setBeanName("input");
- outputChannel.setBeanName("output");
- context.getBeanFactory().registerSingleton("input", inputChannel);
- context.getBeanFactory().registerSingleton("output", outputChannel);
- context.getBeanFactory().registerSingleton(
- MessageBusParser.MESSAGE_BUS_BEAN_NAME, messageBus);
- messageBus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- messageBus.setApplicationContext(context);
+ context.registerChannel("input", inputChannel);
+ context.registerChannel("output", outputChannel);
}
@@ -69,7 +59,6 @@ public class SplitterAnnotationPostProcessorTests {
TestSplitter splitter = new TestSplitter();
postProcessor.postProcessAfterInitialization(splitter, "testSplitter");
context.refresh();
- messageBus.start();
inputChannel.send(new StringMessage("this.is.a.test"));
Message> message1 = outputChannel.receive(500);
assertNotNull(message1);
@@ -84,7 +73,7 @@ public class SplitterAnnotationPostProcessorTests {
assertNotNull(message4);
assertEquals("test", message4.getPayload());
assertNull(outputChannel.receive(0));
- messageBus.stop();
+ context.stop();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java
index b159365e12..f5e5aa84ea 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/gateway/SimpleMessagingGatewayTests.java
@@ -36,6 +36,7 @@ import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.MessageDeliveryException;
+import org.springframework.integration.util.TestUtils;
/**
* @author Iwein Fuld
@@ -59,6 +60,7 @@ public class SimpleMessagingGatewayTests {
this.simpleMessagingGateway = new SimpleMessagingGateway();
this.simpleMessagingGateway.setRequestChannel(requestChannel);
this.simpleMessagingGateway.setReplyChannel(replyChannel);
+ this.simpleMessagingGateway.setBeanFactory(TestUtils.createTestApplicationContext());
reset(allmocks);
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java
index 4180b9df4b..543840bfbf 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/message/MethodInvokingMessageHandlerTests.java
@@ -26,14 +26,13 @@ import java.util.concurrent.TimeUnit;
import org.junit.Test;
-import org.springframework.context.support.GenericApplicationContext;
-import org.springframework.integration.bus.ApplicationContextMessageBus;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessagingException;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.handler.MethodInvokingMessageHandler;
import org.springframework.integration.util.TestUtils;
+import org.springframework.integration.util.TestUtils.TestApplicationContext;
/**
* @author Mark Fisher
@@ -75,27 +74,22 @@ public class MethodInvokingMessageHandlerTests {
@Test
public void subscription() throws Exception {
- GenericApplicationContext context = new GenericApplicationContext();
+ TestApplicationContext context = TestUtils.createTestApplicationContext();
SynchronousQueue queue = new SynchronousQueue();
TestBean testBean = new TestBean(queue);
QueueChannel channel = new QueueChannel();
- channel.setBeanName("channel");
- context.getBeanFactory().registerSingleton("channel", channel);
+ context.registerChannel("channel", channel);
Message message = new GenericMessage("testing");
channel.send(message);
assertNull(queue.poll());
MethodInvokingMessageHandler handler = new MethodInvokingMessageHandler(testBean, "foo");
PollingConsumer endpoint = new PollingConsumer(channel, handler);
- context.getBeanFactory().registerSingleton("testEndpoint", endpoint);
- ApplicationContextMessageBus bus = new ApplicationContextMessageBus();
- bus.setTaskScheduler(TestUtils.createTaskScheduler(10));
- bus.setApplicationContext(context);
+ context.registerEndpoint("testEndpoint", endpoint);
context.refresh();
- bus.start();
String result = queue.poll(1000, TimeUnit.MILLISECONDS);
assertNotNull(result);
assertEquals("testing", result);
- bus.stop();
+ context.stop();
}
diff --git a/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java b/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java
index bfb5d0445f..bc231d2581 100644
--- a/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java
+++ b/org.springframework.integration/src/test/java/org/springframework/integration/util/TestUtils.java
@@ -19,6 +19,16 @@ package org.springframework.integration.util;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import org.springframework.beans.DirectFieldAccessor;
+import org.springframework.beans.FatalBeanException;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.BeanFactoryAware;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
+import org.springframework.context.support.GenericApplicationContext;
+import org.springframework.integration.context.IntegrationContextUtils;
+import org.springframework.integration.core.MessageChannel;
+import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.scheduling.SimpleTaskScheduler;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -56,6 +66,12 @@ public abstract class TestUtils {
return (T) value;
}
+ public static TestApplicationContext createTestApplicationContext() {
+ TestApplicationContext context = new TestApplicationContext();
+ registerBean(IntegrationContextUtils.TASK_SCHEDULER_BEAN_NAME, createTaskScheduler(10), context);
+ return context;
+ }
+
public static TaskScheduler createTaskScheduler(int poolSize) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(poolSize);
@@ -64,4 +80,56 @@ public abstract class TestUtils {
return new SimpleTaskScheduler(executor);
}
+ private static void registerBean(String beanName, Object bean, BeanFactory beanFactory) {
+ Assert.notNull(beanName, "bean name must not be null");
+ ConfigurableListableBeanFactory configurableListableBeanFactory = null;
+ if (beanFactory instanceof ConfigurableListableBeanFactory) {
+ configurableListableBeanFactory = (ConfigurableListableBeanFactory) beanFactory;
+ }
+ else if (beanFactory instanceof GenericApplicationContext) {
+ configurableListableBeanFactory = ((GenericApplicationContext) beanFactory).getBeanFactory();
+ }
+ if (bean instanceof BeanNameAware) {
+ ((BeanNameAware) bean).setBeanName(beanName);
+ }
+ if (bean instanceof BeanFactoryAware) {
+ ((BeanFactoryAware) bean).setBeanFactory(beanFactory);
+ }
+ if (bean instanceof InitializingBean) {
+ try {
+ ((InitializingBean) bean).afterPropertiesSet();
+ }
+ catch (Exception e) {
+ throw new FatalBeanException("failed to register bean with test context", e);
+ }
+ }
+ configurableListableBeanFactory.registerSingleton(beanName, bean);
+ }
+
+
+ public static class TestApplicationContext extends GenericApplicationContext {
+
+ private TestApplicationContext() {
+ super();
+ }
+
+ public void registerChannel(String channelName, MessageChannel channel) {
+ if (channel.getName() != null) {
+ if (channelName == null) {
+ Assert.notNull(channel.getName(), "channel name must not be null");
+ channelName = channel.getName();
+ }
+ else {
+ Assert.isTrue(channel.getName().equals(channelName),
+ "channel name has already been set with a conflicting value");
+ }
+ }
+ registerBean(channelName, channel, this);
+ }
+
+ public void registerEndpoint(String endpointName, AbstractEndpoint endpoint) {
+ registerBean(endpointName, endpoint, this);
+ }
+ }
+
}