From 6b468da46deadd92e821d47c8bcb00c8cfc5ea29 Mon Sep 17 00:00:00 2001 From: Mark Fisher Date: Thu, 13 Nov 2008 17:49:13 +0000 Subject: [PATCH] Removed the LifeycleSupport base class. SimpleTaskScheduler and AbstractEndpoint now provide their own Lifecycle implementations. Added an IntegrationObjectSupport class with convenient access to the 'channelResolver' and 'taskScheduler' instances. --- ...ApplicationEventInboundChannelAdapter.java | 3 +- .../config/ConsumerEndpointFactoryBean.java | 11 +- ...actPollingInboundChannelAdapterParser.java | 6 +- .../context/IntegrationObjectSupport.java | 92 ++++++++++++ .../endpoint/AbstractEndpoint.java | 124 ++++++++++----- .../scheduling/SimpleTaskScheduler.java | 102 ++++++++++--- .../integration/util/LifecycleSupport.java | 142 ------------------ 7 files changed, 264 insertions(+), 216 deletions(-) create mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java delete mode 100644 org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java diff --git a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java index 4c1cc4cb9e..524f67f44c 100644 --- a/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java +++ b/org.springframework.integration.event/src/main/java/org/springframework/integration/event/ApplicationEventInboundChannelAdapter.java @@ -50,8 +50,7 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor } } - @Override - protected void onEvent(ApplicationEvent event) { + public void onApplicationEvent(ApplicationEvent event) { if (CollectionUtils.isEmpty(this.eventTypes)) { this.sendEventAsMessage(event); return; diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java index 36f1e4f6ff..3a7b387ec5 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/ConsumerEndpointFactoryBean.java @@ -29,12 +29,11 @@ import org.springframework.integration.channel.PollableChannel; import org.springframework.integration.channel.SubscribableChannel; import org.springframework.integration.core.MessageChannel; import org.springframework.integration.endpoint.AbstractEndpoint; -import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.endpoint.EventDrivenConsumer; +import org.springframework.integration.endpoint.PollingConsumer; import org.springframework.integration.message.MessageHandler; import org.springframework.integration.scheduling.IntervalTrigger; import org.springframework.integration.scheduling.Trigger; -import org.springframework.integration.util.LifecycleSupport.AutoStartMode; import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.TransactionDefinition; import org.springframework.util.Assert; @@ -176,9 +175,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar throw new IllegalArgumentException( "unsupported channel type: [" + channel.getClass() + "]"); } - if (!this.autoStartup) { - this.endpoint.setAutoStartMode(AutoStartMode.NONE); - } + this.endpoint.setAutoStartup(this.autoStartup); this.endpoint.setBeanName(this.beanName); this.endpoint.setBeanFactory(this.beanFactory); if (this.endpoint instanceof InitializingBean) { @@ -189,7 +186,9 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar } public void onApplicationEvent(ApplicationEvent event) { - this.endpoint.onApplicationEvent(event); + if (this.endpoint instanceof ApplicationListener) { + ((ApplicationListener) this.endpoint).onApplicationEvent(event); + } } } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java index 98e52d242b..e8e2c85870 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/config/xml/AbstractPollingInboundChannelAdapterParser.java @@ -23,7 +23,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; import org.springframework.integration.endpoint.SourcePollingChannelAdapter; import org.springframework.integration.scheduling.IntervalTrigger; -import org.springframework.integration.util.LifecycleSupport.AutoStartMode; import org.springframework.util.Assert; import org.springframework.util.xml.DomUtils; @@ -53,10 +52,7 @@ public abstract class AbstractPollingInboundChannelAdapterParser extends Abstrac else { adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval())); } - String autoStart = element.getAttribute("auto-startup"); - if ("false".equals(autoStart)) { - adapterBuilder.addPropertyValue("autoStartMode", AutoStartMode.NONE); - } + IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup"); return adapterBuilder.getBeanDefinition(); } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java new file mode 100644 index 0000000000..eae41b013a --- /dev/null +++ b/org.springframework.integration/src/main/java/org/springframework/integration/context/IntegrationObjectSupport.java @@ -0,0 +1,92 @@ +/* + * Copyright 2002-2008 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.context; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.BeanFactoryAware; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.integration.channel.BeanFactoryChannelResolver; +import org.springframework.integration.channel.ChannelResolver; +import org.springframework.integration.scheduling.TaskScheduler; +import org.springframework.util.Assert; + +/** + * A base class that provides convenient access to the bean factory as + * well as {@link ChannelResolver} and {@link TaskScheduler} instances. + * + *

This is intended to be used as a base class for internal framework + * components whereas code built upon the integration framework should not + * require tight coupling with the context but rather rely on standard + * dependency injection. + * + * @author Mark Fisher + */ +public abstract class IntegrationObjectSupport implements BeanNameAware, BeanFactoryAware { + + /** Logger that is available to subclasses */ + protected final Log logger = LogFactory.getLog(getClass()); + + private volatile String beanName; + + private volatile BeanFactory beanFactory; + + private volatile ChannelResolver channelResolver; + + private volatile TaskScheduler taskScheduler; + + + public void setBeanName(String beanName) { + this.beanName = beanName; + } + + public final void setBeanFactory(BeanFactory beanFactory) { + Assert.notNull(beanFactory, "beanFactory must not be null"); + this.beanFactory = beanFactory; + this.channelResolver = new BeanFactoryChannelResolver(beanFactory); + TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory); + if (taskScheduler != null) { + this.taskScheduler = taskScheduler; + } + } + + protected BeanFactory getBeanFactory() { + return this.beanFactory; + } + + protected ChannelResolver getChannelResolver() { + return this.channelResolver; + } + + protected TaskScheduler getTaskScheduler() { + return this.taskScheduler; + } + + protected void setTaskScheduler(TaskScheduler taskScheduler) { + Assert.notNull(taskScheduler, "taskScheduler must not be null"); + this.taskScheduler = taskScheduler; + } + + + @Override + public String toString() { + return (this.beanName != null) ? this.beanName : super.toString(); + } + +} diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java index 3224427089..b38843f138 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/endpoint/AbstractEndpoint.java @@ -16,66 +16,112 @@ package org.springframework.integration.endpoint; -import org.springframework.beans.factory.BeanFactory; -import org.springframework.beans.factory.BeanFactoryAware; -import org.springframework.beans.factory.BeanNameAware; -import org.springframework.integration.channel.BeanFactoryChannelResolver; -import org.springframework.integration.channel.ChannelResolver; -import org.springframework.integration.context.IntegrationContextUtils; +import java.util.concurrent.locks.ReentrantLock; + +import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.Lifecycle; +import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.scheduling.TaskScheduler; -import org.springframework.integration.util.LifecycleSupport; -import org.springframework.util.Assert; /** * The base class for Message Endpoint implementations. * + *

This class implements Lifecycle and provides an {@link #autoStartup} + * property. If true, the endpoint will start automatically upon + * initialization. Otherwise, it will require an explicit invocation of its + * {@link #start()} method. The default value is true. + * To require explicit startup, provide a value of false + * to the {@link #setAutoStartup(boolean)} method. + * * @author Mark Fisher */ -public abstract class AbstractEndpoint extends LifecycleSupport implements BeanNameAware, BeanFactoryAware { +public abstract class AbstractEndpoint extends IntegrationObjectSupport implements Lifecycle, InitializingBean { - private volatile String beanName; + private volatile boolean autoStartup = true; - private volatile BeanFactory beanFactory; + private volatile boolean running; - private volatile ChannelResolver channelResolver; - - private volatile TaskScheduler taskScheduler; + private final ReentrantLock lifecycleLock = new ReentrantLock(); - public void setBeanName(String beanName) { - this.beanName = beanName; - } - - public final void setBeanFactory(BeanFactory beanFactory) { - Assert.notNull(beanFactory, "beanFactory must not be null"); - this.beanFactory = beanFactory; - this.channelResolver = new BeanFactoryChannelResolver(beanFactory); - TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory); - if (taskScheduler != null) { - this.setTaskScheduler(taskScheduler); - } - } - - protected BeanFactory getBeanFactory() { - return this.beanFactory; + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; } public void setTaskScheduler(TaskScheduler taskScheduler) { - Assert.notNull(taskScheduler, "taskScheduler must not be null"); - this.taskScheduler = taskScheduler; + super.setTaskScheduler(taskScheduler); } - protected TaskScheduler getTaskScheduler() { - return this.taskScheduler; + public final void afterPropertiesSet() { + try { + this.onInit(); + if (this.autoStartup) { + this.start(); + } + } + catch (Exception e) { + throw new BeanInitializationException("failed to initialize", e); + } } - protected ChannelResolver getChannelResolver() { - return this.channelResolver; + // Lifecycle implementation + + public final boolean isRunning() { + this.lifecycleLock.lock(); + try { + return this.running; + } + finally { + this.lifecycleLock.unlock(); + } } - @Override - public String toString() { - return (this.beanName != null) ? this.beanName : super.toString(); + public final void start() { + this.lifecycleLock.lock(); + try { + if (!this.running) { + this.doStart(); + this.running = true; + if (logger.isInfoEnabled()) { + logger.info("started " + this); + } + } + } + finally { + this.lifecycleLock.unlock(); + } } + public final void stop() { + this.lifecycleLock.lock(); + try { + if (this.running) { + this.doStop(); + this.running = false; + if (logger.isInfoEnabled()) { + logger.info("stopped " + this); + } + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + protected void onInit() throws Exception { + } + + /** + * Subclasses must implement this method with the start behavior. + * This method will be invoked while holding the {@link #lifecycleLock}. + */ + protected abstract void doStart(); + + /** + * Subclasses must implement this method with the stop behavior. + * This method will be invoked while holding the {@link #lifecycleLock}. + */ + protected abstract void doStop(); + } diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java index e6d5f3c406..f1c7bfee53 100644 --- a/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java +++ b/org.springframework.integration/src/main/java/org/springframework/integration/scheduling/SimpleTaskScheduler.java @@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -33,26 +34,38 @@ import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.beans.factory.DisposableBean; +import org.springframework.context.ApplicationEvent; +import org.springframework.context.ApplicationListener; +import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.core.task.TaskExecutor; import org.springframework.integration.channel.BeanFactoryChannelResolver; import org.springframework.integration.channel.MessagePublishingErrorHandler; import org.springframework.integration.util.ErrorHandler; -import org.springframework.integration.util.LifecycleSupport; import org.springframework.scheduling.SchedulingException; import org.springframework.util.Assert; /** - * An implementation of {@link TaskScheduler} that delegates to a {@link TaskExecutor}. - * + * An implementation of {@link TaskScheduler} that delegates to any instance + * of {@link TaskExecutor}. + * + *

This class implements Lifecycle and provides an {@link #autoStartup} + * property. If true, the scheduler will start automatically upon + * receiving the {@link ContextRefreshedEvent}. Otherwise, it will require an + * explicit invocation of its {@link #start()} method. The default value is + * true. To require explicit startup, provide a value of + * false to the {@link #setAutoStartup(boolean)} method. + * * @author Mark Fisher * @author Marius Bogoevici */ -public class SimpleTaskScheduler extends LifecycleSupport implements TaskScheduler, BeanFactoryAware, DisposableBean { +public class SimpleTaskScheduler implements TaskScheduler, BeanFactoryAware, ApplicationListener, DisposableBean { private final Log logger = LogFactory.getLog(this.getClass()); private final TaskExecutor executor; + private volatile boolean autoStartup = true; + private volatile ErrorHandler errorHandler; private volatile SchedulerTask schedulerTask = null; @@ -61,14 +74,21 @@ public class SimpleTaskScheduler extends LifecycleSupport implements TaskSchedul private final Set> executingTasks = Collections.synchronizedSet(new TreeSet>()); + private volatile boolean running; + + private final ReentrantLock lifecycleLock = new ReentrantLock(); + public SimpleTaskScheduler(TaskExecutor executor) { Assert.notNull(executor, "executor must not be null"); this.executor = executor; - this.setAutoStartMode(AutoStartMode.ON_CONTEXT_REFRESH); } + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; } @@ -95,28 +115,66 @@ public class SimpleTaskScheduler extends LifecycleSupport implements TaskSchedul } - // LifecycleSupport implementation + // Lifecycle implementation - @Override // guarded by super#lifecycleLock - protected void doStart() { - this.executor.execute(this.schedulerTask = new SchedulerTask()); + public final boolean isRunning() { + this.lifecycleLock.lock(); + try { + return this.running; + } + finally { + this.lifecycleLock.unlock(); + } } - @Override // guarded by super#lifecycleLock - protected void doStop() { - this.schedulerTask.deactivate(); - Thread executingThread = this.schedulerTask.executingThread.get(); - if (executingThread != null) { - executingThread.interrupt(); - } - this.scheduledTasks.clear(); - synchronized (this.executingTasks) { - for (TriggeredTask task : this.executingTasks) { - task.cancel(true); + public final void start() { + this.lifecycleLock.lock(); + try { + if (!this.running) { + this.executor.execute(this.schedulerTask = new SchedulerTask()); + this.running = true; + if (logger.isInfoEnabled()) { + logger.info("started " + this); + } } - this.executingTasks.clear(); } - this.schedulerTask = null; + finally { + this.lifecycleLock.unlock(); + } + } + + public final void stop() { + this.lifecycleLock.lock(); + try { + if (this.running) { + this.schedulerTask.deactivate(); + Thread executingThread = this.schedulerTask.executingThread.get(); + if (executingThread != null) { + executingThread.interrupt(); + } + this.scheduledTasks.clear(); + synchronized (this.executingTasks) { + for (TriggeredTask task : this.executingTasks) { + task.cancel(true); + } + this.executingTasks.clear(); + } + this.schedulerTask = null; + this.running = false; + if (logger.isInfoEnabled()) { + logger.info("stopped " + this); + } + } + } + finally { + this.lifecycleLock.unlock(); + } + } + + public final void onApplicationEvent(ApplicationEvent event) { + if (event instanceof ContextRefreshedEvent && this.autoStartup) { + this.start(); + } } public void destroy() throws Exception { diff --git a/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java b/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java deleted file mode 100644 index 64406d3305..0000000000 --- a/org.springframework.integration/src/main/java/org/springframework/integration/util/LifecycleSupport.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2002-2008 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.integration.util; - -import java.util.concurrent.locks.ReentrantLock; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.beans.factory.BeanInitializationException; -import org.springframework.beans.factory.InitializingBean; -import org.springframework.context.ApplicationEvent; -import org.springframework.context.ApplicationListener; -import org.springframework.context.Lifecycle; -import org.springframework.context.event.ContextRefreshedEvent; - -/** - * A convenience base class for Lifecycle components that supports an - * "auto-startup" mode property. Depending on the mode, the component can - * be started either upon initialization, upon receiving the - * {@link ContextRefreshedEvent}, or may require an explicit start invocation. - * The timing of the startup is determined by the value of {@link #autoStartMode}. - * The default value is {@link AutoStartMode#ON_INIT}. To require explicit startup, - * set the mode to {@link AutoStartMode#NONE} using the - * {@link #setAutoStartMode(AutoStartMode)} method. - * - * @author Mark Fisher - */ -public abstract class LifecycleSupport implements Lifecycle, InitializingBean, ApplicationListener { - - public static enum AutoStartMode { ON_INIT, ON_CONTEXT_REFRESH, NONE } - - - protected final Log logger = LogFactory.getLog(this.getClass()); - - private volatile AutoStartMode autoStartMode = AutoStartMode.ON_INIT; - - private volatile boolean running; - - private final ReentrantLock lifecycleLock = new ReentrantLock(); - - - public void setAutoStartMode(AutoStartMode autoStartMode) { - this.autoStartMode = (autoStartMode != null) ? autoStartMode : AutoStartMode.NONE; - } - - public final void afterPropertiesSet() { - try { - this.onInit(); - if (this.autoStartMode == AutoStartMode.ON_INIT) { - this.start(); - } - } - catch (Exception e) { - throw new BeanInitializationException("failed to initialize", e); - } - } - - public final void onApplicationEvent(ApplicationEvent event) { - this.onEvent(event); - if (event instanceof ContextRefreshedEvent && this.autoStartMode == AutoStartMode.ON_CONTEXT_REFRESH) { - this.start(); - } - } - - // Lifecycle implementation - - public final boolean isRunning() { - this.lifecycleLock.lock(); - try { - return this.running; - } - finally { - this.lifecycleLock.unlock(); - } - } - - public final void start() { - this.lifecycleLock.lock(); - try { - if (!this.running) { - this.doStart(); - this.running = true; - if (logger.isInfoEnabled()) { - logger.info("started " + this); - } - } - } - finally { - this.lifecycleLock.unlock(); - } - } - - public final void stop() { - this.lifecycleLock.lock(); - try { - if (this.running) { - this.doStop(); - this.running = false; - if (logger.isInfoEnabled()) { - logger.info("stopped " + this); - } - } - } - finally { - this.lifecycleLock.unlock(); - } - } - - protected void onInit() throws Exception { - } - - protected void onEvent(ApplicationEvent event) { - } - - /** - * Subclasses must implement this method with the start behavior. - * This method will be invoked while holding the {@link #lifecycleLock}. - */ - protected abstract void doStart(); - - /** - * Subclasses must implement this method with the stop behavior. - * This method will be invoked while holding the {@link #lifecycleLock}. - */ - protected abstract void doStop(); - -}