Removed the LifeycleSupport base class. SimpleTaskScheduler and AbstractEndpoint now provide their own Lifecycle implementations. Added an IntegrationObjectSupport class with convenient access to the 'channelResolver' and 'taskScheduler' instances.

This commit is contained in:
Mark Fisher
2008-11-13 17:49:13 +00:00
parent 6c554f929d
commit 6b468da46d
7 changed files with 264 additions and 216 deletions

View File

@@ -50,8 +50,7 @@ public class ApplicationEventInboundChannelAdapter extends MessageProducerSuppor
}
}
@Override
protected void onEvent(ApplicationEvent event) {
public void onApplicationEvent(ApplicationEvent event) {
if (CollectionUtils.isEmpty(this.eventTypes)) {
this.sendEventAsMessage(event);
return;

View File

@@ -29,12 +29,11 @@ import org.springframework.integration.channel.PollableChannel;
import org.springframework.integration.channel.SubscribableChannel;
import org.springframework.integration.core.MessageChannel;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.message.MessageHandler;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.scheduling.Trigger;
import org.springframework.integration.util.LifecycleSupport.AutoStartMode;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.util.Assert;
@@ -176,9 +175,7 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
throw new IllegalArgumentException(
"unsupported channel type: [" + channel.getClass() + "]");
}
if (!this.autoStartup) {
this.endpoint.setAutoStartMode(AutoStartMode.NONE);
}
this.endpoint.setAutoStartup(this.autoStartup);
this.endpoint.setBeanName(this.beanName);
this.endpoint.setBeanFactory(this.beanFactory);
if (this.endpoint instanceof InitializingBean) {
@@ -189,7 +186,9 @@ public class ConsumerEndpointFactoryBean implements FactoryBean, BeanFactoryAwar
}
public void onApplicationEvent(ApplicationEvent event) {
this.endpoint.onApplicationEvent(event);
if (this.endpoint instanceof ApplicationListener) {
((ApplicationListener) this.endpoint).onApplicationEvent(event);
}
}
}

View File

@@ -23,7 +23,6 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
import org.springframework.integration.scheduling.IntervalTrigger;
import org.springframework.integration.util.LifecycleSupport.AutoStartMode;
import org.springframework.util.Assert;
import org.springframework.util.xml.DomUtils;
@@ -53,10 +52,7 @@ public abstract class AbstractPollingInboundChannelAdapterParser extends Abstrac
else {
adapterBuilder.addPropertyValue("trigger", new IntervalTrigger(this.getDefaultPollInterval()));
}
String autoStart = element.getAttribute("auto-startup");
if ("false".equals(autoStart)) {
adapterBuilder.addPropertyValue("autoStartMode", AutoStartMode.NONE);
}
IntegrationNamespaceUtils.setValueIfAttributeDefined(adapterBuilder, element, "auto-startup");
return adapterBuilder.getBeanDefinition();
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.context;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.util.Assert;
/**
* A base class that provides convenient access to the bean factory as
* well as {@link ChannelResolver} and {@link TaskScheduler} instances.
*
* <p>This is intended to be used as a base class for internal framework
* components whereas code built upon the integration framework should not
* require tight coupling with the context but rather rely on standard
* dependency injection.
*
* @author Mark Fisher
*/
public abstract class IntegrationObjectSupport implements BeanNameAware, BeanFactoryAware {
/** Logger that is available to subclasses */
protected final Log logger = LogFactory.getLog(getClass());
private volatile String beanName;
private volatile BeanFactory beanFactory;
private volatile ChannelResolver channelResolver;
private volatile TaskScheduler taskScheduler;
public void setBeanName(String beanName) {
this.beanName = beanName;
}
public final void setBeanFactory(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null");
this.beanFactory = beanFactory;
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory);
if (taskScheduler != null) {
this.taskScheduler = taskScheduler;
}
}
protected BeanFactory getBeanFactory() {
return this.beanFactory;
}
protected ChannelResolver getChannelResolver() {
return this.channelResolver;
}
protected TaskScheduler getTaskScheduler() {
return this.taskScheduler;
}
protected void setTaskScheduler(TaskScheduler taskScheduler) {
Assert.notNull(taskScheduler, "taskScheduler must not be null");
this.taskScheduler = taskScheduler;
}
@Override
public String toString() {
return (this.beanName != null) ? this.beanName : super.toString();
}
}

View File

@@ -16,66 +16,112 @@
package org.springframework.integration.endpoint;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.ChannelResolver;
import org.springframework.integration.context.IntegrationContextUtils;
import java.util.concurrent.locks.ReentrantLock;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.Lifecycle;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.scheduling.TaskScheduler;
import org.springframework.integration.util.LifecycleSupport;
import org.springframework.util.Assert;
/**
* The base class for Message Endpoint implementations.
*
* <p>This class implements Lifecycle and provides an {@link #autoStartup}
* property. If <code>true</code>, the endpoint will start automatically upon
* initialization. Otherwise, it will require an explicit invocation of its
* {@link #start()} method. The default value is <code>true</code>.
* To require explicit startup, provide a value of <code>false</code>
* to the {@link #setAutoStartup(boolean)} method.
*
* @author Mark Fisher
*/
public abstract class AbstractEndpoint extends LifecycleSupport implements BeanNameAware, BeanFactoryAware {
public abstract class AbstractEndpoint extends IntegrationObjectSupport implements Lifecycle, InitializingBean {
private volatile String beanName;
private volatile boolean autoStartup = true;
private volatile BeanFactory beanFactory;
private volatile boolean running;
private volatile ChannelResolver channelResolver;
private volatile TaskScheduler taskScheduler;
private final ReentrantLock lifecycleLock = new ReentrantLock();
public void setBeanName(String beanName) {
this.beanName = beanName;
}
public final void setBeanFactory(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null");
this.beanFactory = beanFactory;
this.channelResolver = new BeanFactoryChannelResolver(beanFactory);
TaskScheduler taskScheduler = IntegrationContextUtils.getTaskScheduler(beanFactory);
if (taskScheduler != null) {
this.setTaskScheduler(taskScheduler);
}
}
protected BeanFactory getBeanFactory() {
return this.beanFactory;
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
public void setTaskScheduler(TaskScheduler taskScheduler) {
Assert.notNull(taskScheduler, "taskScheduler must not be null");
this.taskScheduler = taskScheduler;
super.setTaskScheduler(taskScheduler);
}
protected TaskScheduler getTaskScheduler() {
return this.taskScheduler;
public final void afterPropertiesSet() {
try {
this.onInit();
if (this.autoStartup) {
this.start();
}
}
catch (Exception e) {
throw new BeanInitializationException("failed to initialize", e);
}
}
protected ChannelResolver getChannelResolver() {
return this.channelResolver;
// Lifecycle implementation
public final boolean isRunning() {
this.lifecycleLock.lock();
try {
return this.running;
}
finally {
this.lifecycleLock.unlock();
}
}
@Override
public String toString() {
return (this.beanName != null) ? this.beanName : super.toString();
public final void start() {
this.lifecycleLock.lock();
try {
if (!this.running) {
this.doStart();
this.running = true;
if (logger.isInfoEnabled()) {
logger.info("started " + this);
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
public final void stop() {
this.lifecycleLock.lock();
try {
if (this.running) {
this.doStop();
this.running = false;
if (logger.isInfoEnabled()) {
logger.info("stopped " + this);
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
protected void onInit() throws Exception {
}
/**
* Subclasses must implement this method with the start behavior.
* This method will be invoked while holding the {@link #lifecycleLock}.
*/
protected abstract void doStart();
/**
* Subclasses must implement this method with the stop behavior.
* This method will be invoked while holding the {@link #lifecycleLock}.
*/
protected abstract void doStop();
}

View File

@@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,26 +34,38 @@ import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.BeanFactoryChannelResolver;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.util.ErrorHandler;
import org.springframework.integration.util.LifecycleSupport;
import org.springframework.scheduling.SchedulingException;
import org.springframework.util.Assert;
/**
* An implementation of {@link TaskScheduler} that delegates to a {@link TaskExecutor}.
*
* An implementation of {@link TaskScheduler} that delegates to any instance
* of {@link TaskExecutor}.
*
* <p>This class implements Lifecycle and provides an {@link #autoStartup}
* property. If <code>true</code>, the scheduler will start automatically upon
* receiving the {@link ContextRefreshedEvent}. Otherwise, it will require an
* explicit invocation of its {@link #start()} method. The default value is
* <code>true</code>. To require explicit startup, provide a value of
* <code>false</code> to the {@link #setAutoStartup(boolean)} method.
*
* @author Mark Fisher
* @author Marius Bogoevici
*/
public class SimpleTaskScheduler extends LifecycleSupport implements TaskScheduler, BeanFactoryAware, DisposableBean {
public class SimpleTaskScheduler implements TaskScheduler, BeanFactoryAware, ApplicationListener, DisposableBean {
private final Log logger = LogFactory.getLog(this.getClass());
private final TaskExecutor executor;
private volatile boolean autoStartup = true;
private volatile ErrorHandler errorHandler;
private volatile SchedulerTask schedulerTask = null;
@@ -61,14 +74,21 @@ public class SimpleTaskScheduler extends LifecycleSupport implements TaskSchedul
private final Set<TriggeredTask<?>> executingTasks = Collections.synchronizedSet(new TreeSet<TriggeredTask<?>>());
private volatile boolean running;
private final ReentrantLock lifecycleLock = new ReentrantLock();
public SimpleTaskScheduler(TaskExecutor executor) {
Assert.notNull(executor, "executor must not be null");
this.executor = executor;
this.setAutoStartMode(AutoStartMode.ON_CONTEXT_REFRESH);
}
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
@@ -95,28 +115,66 @@ public class SimpleTaskScheduler extends LifecycleSupport implements TaskSchedul
}
// LifecycleSupport implementation
// Lifecycle implementation
@Override // guarded by super#lifecycleLock
protected void doStart() {
this.executor.execute(this.schedulerTask = new SchedulerTask());
public final boolean isRunning() {
this.lifecycleLock.lock();
try {
return this.running;
}
finally {
this.lifecycleLock.unlock();
}
}
@Override // guarded by super#lifecycleLock
protected void doStop() {
this.schedulerTask.deactivate();
Thread executingThread = this.schedulerTask.executingThread.get();
if (executingThread != null) {
executingThread.interrupt();
}
this.scheduledTasks.clear();
synchronized (this.executingTasks) {
for (TriggeredTask<?> task : this.executingTasks) {
task.cancel(true);
public final void start() {
this.lifecycleLock.lock();
try {
if (!this.running) {
this.executor.execute(this.schedulerTask = new SchedulerTask());
this.running = true;
if (logger.isInfoEnabled()) {
logger.info("started " + this);
}
}
this.executingTasks.clear();
}
this.schedulerTask = null;
finally {
this.lifecycleLock.unlock();
}
}
public final void stop() {
this.lifecycleLock.lock();
try {
if (this.running) {
this.schedulerTask.deactivate();
Thread executingThread = this.schedulerTask.executingThread.get();
if (executingThread != null) {
executingThread.interrupt();
}
this.scheduledTasks.clear();
synchronized (this.executingTasks) {
for (TriggeredTask<?> task : this.executingTasks) {
task.cancel(true);
}
this.executingTasks.clear();
}
this.schedulerTask = null;
this.running = false;
if (logger.isInfoEnabled()) {
logger.info("stopped " + this);
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
public final void onApplicationEvent(ApplicationEvent event) {
if (event instanceof ContextRefreshedEvent && this.autoStartup) {
this.start();
}
}
public void destroy() throws Exception {

View File

@@ -1,142 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.util;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.Lifecycle;
import org.springframework.context.event.ContextRefreshedEvent;
/**
* A convenience base class for Lifecycle components that supports an
* "auto-startup" mode property. Depending on the mode, the component can
* be started either upon initialization, upon receiving the
* {@link ContextRefreshedEvent}, or may require an explicit start invocation.
* The timing of the startup is determined by the value of {@link #autoStartMode}.
* The default value is {@link AutoStartMode#ON_INIT}. To require explicit startup,
* set the mode to {@link AutoStartMode#NONE} using the
* {@link #setAutoStartMode(AutoStartMode)} method.
*
* @author Mark Fisher
*/
public abstract class LifecycleSupport implements Lifecycle, InitializingBean, ApplicationListener {
public static enum AutoStartMode { ON_INIT, ON_CONTEXT_REFRESH, NONE }
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile AutoStartMode autoStartMode = AutoStartMode.ON_INIT;
private volatile boolean running;
private final ReentrantLock lifecycleLock = new ReentrantLock();
public void setAutoStartMode(AutoStartMode autoStartMode) {
this.autoStartMode = (autoStartMode != null) ? autoStartMode : AutoStartMode.NONE;
}
public final void afterPropertiesSet() {
try {
this.onInit();
if (this.autoStartMode == AutoStartMode.ON_INIT) {
this.start();
}
}
catch (Exception e) {
throw new BeanInitializationException("failed to initialize", e);
}
}
public final void onApplicationEvent(ApplicationEvent event) {
this.onEvent(event);
if (event instanceof ContextRefreshedEvent && this.autoStartMode == AutoStartMode.ON_CONTEXT_REFRESH) {
this.start();
}
}
// Lifecycle implementation
public final boolean isRunning() {
this.lifecycleLock.lock();
try {
return this.running;
}
finally {
this.lifecycleLock.unlock();
}
}
public final void start() {
this.lifecycleLock.lock();
try {
if (!this.running) {
this.doStart();
this.running = true;
if (logger.isInfoEnabled()) {
logger.info("started " + this);
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
public final void stop() {
this.lifecycleLock.lock();
try {
if (this.running) {
this.doStop();
this.running = false;
if (logger.isInfoEnabled()) {
logger.info("stopped " + this);
}
}
}
finally {
this.lifecycleLock.unlock();
}
}
protected void onInit() throws Exception {
}
protected void onEvent(ApplicationEvent event) {
}
/**
* Subclasses must implement this method with the start behavior.
* This method will be invoked while holding the {@link #lifecycleLock}.
*/
protected abstract void doStart();
/**
* Subclasses must implement this method with the stop behavior.
* This method will be invoked while holding the {@link #lifecycleLock}.
*/
protected abstract void doStop();
}