diff --git a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java index 4c1cc4cb9e..524f67f44c 100644 --- a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java +++ b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java @@ -50,8 +50,7 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor } } - @Override - protected void onEvent(ApplicationEvent event) { + public void onApplicationEvent(ApplicationEvent event) { if (CollectionUtils.isEmpty(this.eventTypes)) { this.sendEventAsMessage(event); return; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 36f1e4f6ff..3a7b387ec5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -29,12 +29,11 @@ import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.endpoint.AbstractEndpoint; -import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; -import org.springframework.integration.util.LifecycleSupport.AutoStartMode; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.util.Assert; @@ -176,9 +175,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar throw new IllegalArgumentException( "unsupported channel type: [" + channel.getClass() + "]"); } - if (!this.autoStartup) { - this.endpoint.setAutoStartMode(AutoStartMode.NONE); - } + this.endpoint.setAutoStartup(this.autoStartup); this.endpoint.setBeanName(this.beanName); this.endpoint.setBeanFactory(this.beanFactory); if (this.endpoint instanceof InitializingBean) { @@ -189,7 +186,9 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar } public void onApplicationEvent(ApplicationEvent event) { - this.endpoint.onApplicationEvent(event); + if (this.endpoint instanceof ApplicationListener) { + ((ApplicationListener) this.endpoint).onApplicationEvent(event); + } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java index 98e52d242b..e8e2c85870 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java @@ -23,7 +23,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.scheduling.IntervalTrigger; -import org.springframework.integration.util.LifecycleSupport.AutoStartMode; import org.springframework.util.Assert; import org.springframework.util.xml.DomUtils; @@ -53,10 +52,7 @@ public abstract class AbstractPollingInboundChannelAdapterParser extends Abstrac else { adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval())); } - String autoStart = element.getAttribute("auto-startup"); - if ("false".equals(autoStart)) { - adapterBuilder.addPropertyValue("autoStartMode", AutoStartMode.NONE); - } + IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup"); return adapterBuilder.getBeanDefinition(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java new file mode 100644 index 0000000000..eae41b013a --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.context; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.integration.channel.BeanFactoryChannelResolver; +import org.springframework.integration.channel.ChannelResolver; +import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.util.Assert; + +/** + * A base class that provides convenient access to the bean factory as + * well as {@link ChannelResolver} and {@link TaskScheduler} instances. + * + *
This is intended to be used as a base class for internal framework + * components whereas code built upon the integration framework should not + * require tight coupling with the context but rather rely on standard + * dependency injection. + * + * @author Mark Fisher + */ +public abstract class IntegrationObjectSupport implements BeanNameAware, BeanFactoryAware { + + /** Logger that is available to subclasses */ + protected final Log logger = LogFactory.getLog(getClass()); + + private volatile String beanName; + + private volatile BeanFactory beanFactory; + + private volatile ChannelResolver channelResolver; + + private volatile TaskScheduler taskScheduler; + + + public void setBeanName(String beanName) { + this.beanName = beanName; + } + + public final void setBeanFactory(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "beanFactory must not be null"); + this.beanFactory = beanFactory; + this.channelResolver = new BeanFactoryChannelResolver(beanFactory); + TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory); + if (taskScheduler != null) { + this.taskScheduler = taskScheduler; + } + } + + protected BeanFactory getBeanFactory() { + return this.beanFactory; + } + + protected ChannelResolver getChannelResolver() { + return this.channelResolver; + } + + protected TaskScheduler getTaskScheduler() { + return this.taskScheduler; + } + + protected void setTaskScheduler(TaskScheduler taskScheduler) { + Assert.notNull(taskScheduler, "taskScheduler must not be null"); + this.taskScheduler = taskScheduler; + } + + + @Override + public String toString() { + return (this.beanName != null) ? this.beanName : super.toString(); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 3224427089..b38843f138 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,66 +16,112 @@ package org.springframework.integration.endpoint; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.BeanNameAware; -import org.springframework.integration.channel.BeanFactoryChannelResolver; -import org.springframework.integration.channel.ChannelResolver; -import org.springframework.integration.context.IntegrationContextUtils; +import java.util.concurrent.locks.ReentrantLock; + +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.Lifecycle; +import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.util.LifecycleSupport; -import org.springframework.util.Assert; /** * The base class for Message Endpoint implementations. * + *
This class implements Lifecycle and provides an {@link #autoStartup}
+ * property. If true, the endpoint will start automatically upon
+ * initialization. Otherwise, it will require an explicit invocation of its
+ * {@link #start()} method. The default value is true.
+ * To require explicit startup, provide a value of false
+ * to the {@link #setAutoStartup(boolean)} method.
+ *
* @author Mark Fisher
*/
-public abstract class AbstractEndpoint extends LifecycleSupport implements BeanNameAware, BeanFactoryAware {
+public abstract class AbstractEndpoint extends IntegrationObjectSupport implements Lifecycle, InitializingBean {
- private volatile String beanName;
+ private volatile boolean autoStartup = true;
- private volatile BeanFactory beanFactory;
+ private volatile boolean running;
- private volatile ChannelResolver channelResolver;
-
- private volatile TaskScheduler taskScheduler;
+ private final ReentrantLock lifecycleLock = new ReentrantLock();
- public void setBeanName(String beanName) {
- this.beanName = beanName;
- }
-
- public final void setBeanFactory(BeanFactory beanFactory) {
- Assert.notNull(beanFactory, "beanFactory must not be null");
- this.beanFactory = beanFactory;
- this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
- TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory);
- if (taskScheduler != null) {
- this.setTaskScheduler(taskScheduler);
- }
- }
-
- protected BeanFactory getBeanFactory() {
- return this.beanFactory;
+ public void setAutoStartup(boolean autoStartup) {
+ this.autoStartup = autoStartup;
}
public void setTaskScheduler(TaskScheduler taskScheduler) {
- Assert.notNull(taskScheduler, "taskScheduler must not be null");
- this.taskScheduler = taskScheduler;
+ super.setTaskScheduler(taskScheduler);
}
- protected TaskScheduler getTaskScheduler() {
- return this.taskScheduler;
+ public final void afterPropertiesSet() {
+ try {
+ this.onInit();
+ if (this.autoStartup) {
+ this.start();
+ }
+ }
+ catch (Exception e) {
+ throw new BeanInitializationException("failed to initialize", e);
+ }
}
- protected ChannelResolver getChannelResolver() {
- return this.channelResolver;
+ // Lifecycle implementation
+
+ public final boolean isRunning() {
+ this.lifecycleLock.lock();
+ try {
+ return this.running;
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
}
- @Override
- public String toString() {
- return (this.beanName != null) ? this.beanName : super.toString();
+ public final void start() {
+ this.lifecycleLock.lock();
+ try {
+ if (!this.running) {
+ this.doStart();
+ this.running = true;
+ if (logger.isInfoEnabled()) {
+ logger.info("started " + this);
+ }
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
}
+ public final void stop() {
+ this.lifecycleLock.lock();
+ try {
+ if (this.running) {
+ this.doStop();
+ this.running = false;
+ if (logger.isInfoEnabled()) {
+ logger.info("stopped " + this);
+ }
+ }
+ }
+ finally {
+ this.lifecycleLock.unlock();
+ }
+ }
+
+ protected void onInit() throws Exception {
+ }
+
+ /**
+ * Subclasses must implement this method with the start behavior.
+ * This method will be invoked while holding the {@link #lifecycleLock}.
+ */
+ protected abstract void doStart();
+
+ /**
+ * Subclasses must implement this method with the stop behavior.
+ * This method will be invoked while holding the {@link #lifecycleLock}.
+ */
+ protected abstract void doStop();
+
}
diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
index e6d5f3c406..f1c7bfee53 100644
--- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
+++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java
@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,26 +34,38 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
+import org.springframework.context.ApplicationEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.util.ErrorHandler;
-import org.springframework.integration.util.LifecycleSupport;
import org.springframework.scheduling.SchedulingException;
import org.springframework.util.Assert;
/**
- * An implementation of {@link TaskScheduler} that delegates to a {@link TaskExecutor}.
- *
+ * An implementation of {@link TaskScheduler} that delegates to any instance
+ * of {@link TaskExecutor}.
+ *
+ *
This class implements Lifecycle and provides an {@link #autoStartup}
+ * property. If true, the scheduler will start automatically upon
+ * receiving the {@link ContextRefreshedEvent}. Otherwise, it will require an
+ * explicit invocation of its {@link #start()} method. The default value is
+ * true. To require explicit startup, provide a value of
+ * false to the {@link #setAutoStartup(boolean)} method.
+ *
* @author Mark Fisher
* @author Marius Bogoevici
*/
-public class SimpleTaskScheduler extends LifecycleSupport implements TaskScheduler, BeanFactoryAware, DisposableBean {
+public class SimpleTaskScheduler implements TaskScheduler, BeanFactoryAware, ApplicationListener, DisposableBean {
private final Log logger = LogFactory.getLog(this.getClass());
private final TaskExecutor executor;
+ private volatile boolean autoStartup = true;
+
private volatile ErrorHandler errorHandler;
private volatile SchedulerTask schedulerTask = null;
@@ -61,14 +74,21 @@ public class SimpleTaskScheduler extends LifecycleSupport implements TaskSchedul
private final Set