added "Future submit(Runnable)" and "Future submit(Callable)" to AsyncTaskExecutor; SchedulingTaskExecutor interface extends AsyncTaskExecutor; added ExecutorServiceAdapter class as a standard wrapper for a Spring TaskExecutor; added ThreadPoolExecutorFactoryBean; reduced backport-concurrent support to TaskExecutor adapters

This commit is contained in:
Juergen Hoeller
2009-02-05 22:45:35 +00:00
parent 6cdc25d66a
commit 7e4fb09369
28 changed files with 940 additions and 1100 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package org.springframework.scheduling;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.task.AsyncTaskExecutor;
/**
* A {@link org.springframework.core.task.TaskExecutor} extension exposing
@@ -32,7 +32,7 @@ import org.springframework.core.task.TaskExecutor;
* @see org.springframework.core.task.TaskExecutor
* @see org.springframework.scheduling.commonj.WorkManagerTaskExecutor
*/
public interface SchedulingTaskExecutor extends TaskExecutor {
public interface SchedulingTaskExecutor extends AsyncTaskExecutor {
/**
* Does this <code>TaskExecutor</code> prefer short-lived tasks over

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,10 @@
package org.springframework.scheduling.backportconcurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.Executors;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionException;
@@ -29,11 +33,12 @@ import org.springframework.scheduling.SchedulingTaskExecutor;
* exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
*
* <p><b>NOTE:</b> This class implements Spring's
* {@link org.springframework.core.task.TaskExecutor} interface as well as
* {@link org.springframework.core.task.TaskExecutor} interface (and hence implicitly
* the standard Java 5 {@link java.util.concurrent.Executor} interface) as well as
* the JSR-166 {@link edu.emory.mathcs.backport.java.util.concurrent.Executor}
* interface, with the former being the primary interface, the other just
* serving as secondary convenience. For this reason, the exception handling
* follows the TaskExecutor contract rather than the Executor contract, in
* follows the TaskExecutor contract rather than the backport Executor contract, in
* particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
*
* <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows for
@@ -73,10 +78,11 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor, Executor
setConcurrentExecutor(concurrentExecutor);
}
/**
* Specify the JSR-166 backport concurrent executor to delegate to.
*/
public void setConcurrentExecutor(Executor concurrentExecutor) {
public final void setConcurrentExecutor(Executor concurrentExecutor) {
this.concurrentExecutor =
(concurrentExecutor != null ? concurrentExecutor : Executors.newSingleThreadExecutor());
}
@@ -85,7 +91,7 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor, Executor
* Return the JSR-166 backport concurrent executor that this adapter
* delegates to.
*/
public Executor getConcurrentExecutor() {
public final Executor getConcurrentExecutor() {
return this.concurrentExecutor;
}
@@ -104,6 +110,22 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor, Executor
}
}
public void execute(Runnable task, long startTimeout) {
execute(task);
}
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<Object>(task, null);
execute(future);
return future;
}
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<T>(task);
execute(future);
return future;
}
/**
* This task executor prefers short-lived work units.
*/

View File

@@ -1,285 +0,0 @@
/*
* Copyright 2002-2008 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.backportconcurrent;
import edu.emory.mathcs.backport.java.util.concurrent.Executors;
import edu.emory.mathcs.backport.java.util.concurrent.RejectedExecutionHandler;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.scheduling.support.DelegatingExceptionProofRunnable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/**
* {@link org.springframework.beans.factory.FactoryBean} that sets up
* a JSR-166 backport
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService}
* (by default:
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor}
* as implementation) and exposes it for bean references.
*
* <p>Allows for registration of {@link ScheduledExecutorTask ScheduledExecutorTasks},
* automatically starting the {@link ScheduledExecutorService} on initialization and
* cancelling it on destruction of the context. In scenarios that just require static
* registration of tasks at startup, there is no need to access the
* {@link ScheduledExecutorService} instance itself in application code.
*
* <p>Note that
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService}
* uses a {@link Runnable} instance that is shared between repeated executions,
* in contrast to Quartz which instantiates a new Job for each execution.
*
* <p><b>WARNING:</b> {@link Runnable Runnables} submitted via a native
* {@link java.util.concurrent.ScheduledExecutorService} are removed from
* the execution schedule once they throw an exception. If you would prefer
* to continue execution after such an exception, switch this FactoryBean's
* {@link #setContinueScheduledExecutionAfterException "continueScheduledExecutionAfterException"}
* property to "true".
*
* <p>This class is analogous to the
* {@link org.springframework.scheduling.timer.TimerFactoryBean}
* class for the JDK {@link java.util.Timer} facility.
*
* @author Juergen Hoeller
* @since 2.0.3
* @see ScheduledExecutorTask
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor
* @see org.springframework.scheduling.timer.TimerFactoryBean
*/
public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
private int poolSize = 1;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean exposeUnconfigurableExecutor = false;
private ScheduledExecutorTask[] scheduledExecutorTasks;
private boolean continueScheduledExecutionAfterException = false;
private boolean waitForTasksToCompleteOnShutdown = false;
private String beanName;
private ScheduledExecutorService executor;
/**
* Set the ScheduledExecutorService's pool size.
* Default is 1.
*/
public void setPoolSize(int poolSize) {
Assert.isTrue(poolSize > 0, "'poolSize' must be 1 or higher");
this.poolSize = poolSize;
}
/**
* Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
* Default is the ThreadPoolExecutor's default thread factory.
* @see edu.emory.mathcs.backport.java.util.concurrent.Executors#defaultThreadFactory()
*/
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (threadFactory != null ? threadFactory : Executors.defaultThreadFactory());
}
/**
* Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
* Default is the ThreadPoolExecutor's default abort policy.
* @see edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor.AbortPolicy
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler =
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}
/**
* Specify whether this FactoryBean should expose an unconfigurable
* decorator for the created executor.
* <p>Default is "false", exposing the raw executor as bean reference.
* Switch this flag to "true" to strictly prevent clients from
* modifying the executor's configuration.
* @see edu.emory.mathcs.backport.java.util.concurrent.Executors#unconfigurableScheduledExecutorService
*/
public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
}
/**
* Register a list of ScheduledExecutorTask objects with the ScheduledExecutorService
* that this FactoryBean creates. Depending on each ScheduledExecutorTask's settings,
* it will be registered via one of ScheduledExecutorService's schedule methods.
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#schedule(java.lang.Runnable, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
*/
public void setScheduledExecutorTasks(ScheduledExecutorTask[] scheduledExecutorTasks) {
this.scheduledExecutorTasks = scheduledExecutorTasks;
}
/**
* Specify whether to continue the execution of a scheduled task
* after it threw an exception.
* <p>Default is "false", matching the native behavior of a
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService}.
* Switch this flag to "true" for exception-proof execution of each task,
* continuing scheduled execution as in the case of successful execution.
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate
*/
public void setContinueScheduledExecutionAfterException(boolean continueScheduledExecutionAfterException) {
this.continueScheduledExecutionAfterException = continueScheduledExecutionAfterException;
}
/**
* Set whether to wait for scheduled tasks to complete on shutdown.
* <p>Default is "false". Switch this to "true" if you prefer
* fully completed tasks at the expense of a longer shutdown phase.
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#shutdown()
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#shutdownNow()
*/
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
public void setBeanName(String name) {
this.beanName = name;
}
public void afterPropertiesSet() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ScheduledExecutorService" +
(this.beanName != null ? " '" + this.beanName + "'" : ""));
}
ScheduledExecutorService executor =
createExecutor(this.poolSize, this.threadFactory, this.rejectedExecutionHandler);
// Register specified ScheduledExecutorTasks, if necessary.
if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) {
registerTasks(this.scheduledExecutorTasks, executor);
}
// Wrap executor with an unconfigurable decorator.
this.executor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableScheduledExecutorService(executor) : executor);
}
/**
* Create a new {@link ScheduledExecutorService} instance.
* Called by <code>afterPropertiesSet</code>.
* <p>The default implementation creates a {@link ScheduledThreadPoolExecutor}.
* Can be overridden in subclasses to provide custom
* {@link ScheduledExecutorService} instances.
* @param poolSize the specified pool size
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ScheduledExecutorService instance
* @see #afterPropertiesSet()
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor
*/
protected ScheduledExecutorService createExecutor(
int poolSize, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new ScheduledThreadPoolExecutor(poolSize, threadFactory, rejectedExecutionHandler);
}
/**
* Register the specified {@link ScheduledExecutorTask ScheduledExecutorTasks}
* on the given {@link ScheduledExecutorService}.
* @param tasks the specified ScheduledExecutorTasks (never empty)
* @param executor the ScheduledExecutorService to register the tasks on.
*/
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
for (int i = 0; i < tasks.length; i++) {
ScheduledExecutorTask task = tasks[i];
Runnable runnable = getRunnableToSchedule(task);
if (task.isOneTimeTask()) {
executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
}
else {
if (task.isFixedRate()) {
executor.scheduleAtFixedRate(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
else {
executor.scheduleWithFixedDelay(runnable, task.getDelay(), task.getPeriod(), task.getTimeUnit());
}
}
}
}
/**
* Determine the actual Runnable to schedule for the given task.
* <p>Wraps the task's Runnable in a
* {@link org.springframework.scheduling.support.DelegatingExceptionProofRunnable}
* if necessary, according to the
* {@link #setContinueScheduledExecutionAfterException "continueScheduledExecutionAfterException"}
* flag.
* @param task the ScheduledExecutorTask to schedule
* @return the actual Runnable to schedule (may be a decorator)
*/
protected Runnable getRunnableToSchedule(ScheduledExecutorTask task) {
boolean propagateException = !this.continueScheduledExecutionAfterException;
return new DelegatingExceptionProofRunnable(task.getRunnable(), propagateException);
}
public Object getObject() {
return this.executor;
}
public Class getObjectType() {
return (this.executor != null ? this.executor.getClass() : ScheduledExecutorService.class);
}
public boolean isSingleton() {
return true;
}
/**
* Cancel the ScheduledExecutorService on bean factory shutdown,
* stopping all scheduled tasks.
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#shutdown()
*/
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Shutting down ScheduledExecutorService" +
(this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
this.executor.shutdownNow();
}
}
}

View File

@@ -1,202 +0,0 @@
/*
* Copyright 2002-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.backportconcurrent;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
/**
* JavaBean that describes a scheduled executor task, consisting of the
* {@link Runnable} and a delay plus period. The period needs to be specified;
* there is no point in a default for it.
*
* <p>The JSR-166 backport
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService}
* does not offer more sophisticated scheduling options such as cron expressions.
* Consider using Quartz for such advanced needs.
*
* <p>Note that the
* {@link edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService}
* mechanism uses a {@link Runnable} instance that is shared between repeated executions,
* in contrast to Quartz which creates a new Job instance for each execution.
*
* <p>This class is analogous to the {@link org.springframework.scheduling.timer.ScheduledTimerTask}
* class for the JDK {@link java.util.Timer} facility.
*
* @author Juergen Hoeller
* @since 2.0.3
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
* @see org.springframework.scheduling.timer.ScheduledTimerTask
*/
public class ScheduledExecutorTask {
private Runnable runnable;
private long delay = 0;
private long period = -1;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
private boolean fixedRate = false;
/**
* Create a new ScheduledExecutorTask,
* to be populated via bean properties.
* @see #setDelay
* @see #setPeriod
* @see #setFixedRate
*/
public ScheduledExecutorTask() {
}
/**
* Create a new ScheduledExecutorTask, with default
* one-time execution without delay.
* @param executorTask the Runnable to schedule
*/
public ScheduledExecutorTask(Runnable executorTask) {
this.runnable = executorTask;
}
/**
* Create a new ScheduledExecutorTask, with default
* one-time execution with the given delay.
* @param executorTask the Runnable to schedule
* @param delay the delay before starting the task for the first time (ms)
*/
public ScheduledExecutorTask(Runnable executorTask, long delay) {
this.runnable = executorTask;
this.delay = delay;
}
/**
* Create a new ScheduledExecutorTask.
* @param executorTask the Runnable to schedule
* @param delay the delay before starting the task for the first time (ms)
* @param period the period between repeated task executions (ms)
* @param fixedRate whether to schedule as fixed-rate execution
*/
public ScheduledExecutorTask(Runnable executorTask, long delay, long period, boolean fixedRate) {
this.runnable = executorTask;
this.delay = delay;
this.period = period;
this.fixedRate = fixedRate;
}
/**
* Set the Runnable to schedule as executor task.
*/
public void setRunnable(Runnable executorTask) {
this.runnable = executorTask;
}
/**
* Return the Runnable to schedule as executor task.
*/
public Runnable getRunnable() {
return this.runnable;
}
/**
* Set the delay before starting the task for the first time,
* in milliseconds. Default is 0, immediately starting the
* task after successful scheduling.
*/
public void setDelay(long delay) {
this.delay = delay;
}
/**
* Return the delay before starting the job for the first time.
*/
public long getDelay() {
return this.delay;
}
/**
* Set the period between repeated task executions, in milliseconds.
* <p>Default is -1, leading to one-time execution. In case of a positive value,
* the task will be executed repeatedly, with the given interval inbetween executions.
* <p>Note that the semantics of the period value vary between fixed-rate and
* fixed-delay execution.
* <p><b>Note:</b> A period of 0 (for example as fixed delay) is <i>not</i> supported,
* simply because <code>edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService</code> itself
* does not support it. Hence a value of 0 will be treated as one-time execution;
* however, that value should never be specified explicitly in the first place!
* @see #setFixedRate
* @see #isOneTimeTask()
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
*/
public void setPeriod(long period) {
this.period = period;
}
/**
* Return the period between repeated task executions.
*/
public long getPeriod() {
return this.period;
}
/**
* Is this task only ever going to execute once?
* @return <code>true</code> if this task is only ever going to execute once
* @see #getPeriod()
*/
public boolean isOneTimeTask() {
return (this.period <= 0);
}
/**
* Specify the time unit for the delay and period values.
* Default is milliseconds (<code>TimeUnit.MILLISECONDS</code>).
* @see edu.emory.mathcs.backport.java.util.concurrent.TimeUnit#MILLISECONDS
* @see edu.emory.mathcs.backport.java.util.concurrent.TimeUnit#SECONDS
*/
public void setTimeUnit(TimeUnit timeUnit) {
this.timeUnit = (timeUnit != null ? timeUnit : TimeUnit.MILLISECONDS);
}
/**
* Return the time unit for the delay and period values.
*/
public TimeUnit getTimeUnit() {
return this.timeUnit;
}
/**
* Set whether to schedule as fixed-rate execution, rather than
* fixed-delay execution. Default is "false", that is, fixed delay.
* <p>See ScheduledExecutorService javadoc for details on those execution modes.
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
* @see edu.emory.mathcs.backport.java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate(java.lang.Runnable, long, long, edu.emory.mathcs.backport.java.util.concurrent.TimeUnit)
*/
public void setFixedRate(boolean fixedRate) {
this.fixedRate = fixedRate;
}
/**
* Return whether to schedule as fixed-rate execution.
*/
public boolean isFixedRate() {
return this.fixedRate;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,10 @@
package org.springframework.scheduling.backportconcurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@@ -52,11 +56,12 @@ import org.springframework.util.Assert;
* simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
*
* <p><b>NOTE:</b> This class implements Spring's
* {@link org.springframework.core.task.TaskExecutor} interface as well as
* {@link org.springframework.core.task.TaskExecutor} interface (and hence implicitly
* the standard Java 5 {@link java.util.concurrent.Executor} interface) as well as
* the JSR-166 {@link edu.emory.mathcs.backport.java.util.concurrent.Executor}
* interface, with the former being the primary interface, the other just
* serving as secondary convenience. For this reason, the exception handling
* follows the TaskExecutor contract rather than the Executor contract, in
* follows the TaskExecutor contract rather than the backport Executor contract, in
* particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
*
* @author Juergen Hoeller
@@ -308,6 +313,22 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
}
}
public void execute(Runnable task, long startTimeout) {
execute(task);
}
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<Object>(task, null);
execute(future);
return future;
}
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<T>(task);
execute(future);
return future;
}
/**
* This task executor prefers short-lived work units.
*/

View File

@@ -3,8 +3,8 @@
Scheduling convenience classes for the
<a href="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/">JSR-166 backport</a>
Executor mechanism, allowing to set up a ThreadPoolExecutor or
ScheduledThreadPoolExecutor as bean in a Spring context.
Executor mechanism, allowing to set up a ThreadPoolExecutor
as a TaskExecutor-compliant bean in a Spring context.
</body>
</html>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,23 +16,23 @@
package org.springframework.scheduling.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.SchedulingTaskExecutor;
/**
* Adapter that takes a JDK 1.5 <code>java.util.concurrent.Executor</code> and
* exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
*
* <p><b>NOTE:</b> This class implements Spring's
* {@link org.springframework.core.task.TaskExecutor} interface as well as the JDK 1.5
* {@link java.util.concurrent.Executor} interface, with the former being the primary
* interface, the other just serving as secondary convenience. For this reason, the
* exception handling follows the TaskExecutor contract rather than the Executor contract,
* in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
* Also detects an extended <code>java.util.concurrent.ExecutorService</code>, adapting
* the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
*
* <p>Note that there is a pre-built {@link ThreadPoolTaskExecutor} that allows for
* defining a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor} in bean style,
@@ -43,14 +43,17 @@ import org.springframework.scheduling.SchedulingTaskExecutor;
* @author Juergen Hoeller
* @since 2.0
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.ThreadPoolExecutor
* @see java.util.concurrent.Executors
* @see ThreadPoolTaskExecutor
*/
public class ConcurrentTaskExecutor implements SchedulingTaskExecutor, Executor {
public class ConcurrentTaskExecutor implements SchedulingTaskExecutor {
private Executor concurrentExecutor;
private TaskExecutorAdapter adaptedExecutor;
/**
* Create a new ConcurrentTaskExecutor,
@@ -74,32 +77,34 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor, Executor
/**
* Specify the JDK 1.5 concurrent executor to delegate to.
*/
public void setConcurrentExecutor(Executor concurrentExecutor) {
public final void setConcurrentExecutor(Executor concurrentExecutor) {
this.concurrentExecutor =
(concurrentExecutor != null ? concurrentExecutor : Executors.newSingleThreadExecutor());
this.adaptedExecutor = new TaskExecutorAdapter(this.concurrentExecutor);
}
/**
* Return the JDK 1.5 concurrent executor that this adapter
* delegates to.
* Return the JDK 1.5 concurrent executor that this adapter delegates to.
*/
public Executor getConcurrentExecutor() {
public final Executor getConcurrentExecutor() {
return this.concurrentExecutor;
}
/**
* Delegates to the specified JDK 1.5 concurrent executor.
* @see java.util.concurrent.Executor#execute(Runnable)
*/
public void execute(Runnable task) {
try {
this.concurrentExecutor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
"Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
}
this.adaptedExecutor.execute(task);
}
public void execute(Runnable task, long startTimeout) {
this.adaptedExecutor.execute(task, startTimeout);
}
public Future<?> submit(Runnable task) {
return this.adaptedExecutor.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return this.adaptedExecutor.submit(task);
}
/**

View File

@@ -0,0 +1,160 @@
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
/**
* Base class for classes that are setting up a
* <code>java.util.concurrent.ExecutorService</code>
* (typically a {@link java.util.concurrent.ThreadPoolExecutor}).
* Defines common configuration settings and common lifecycle handling.
*
* @author Juergen Hoeller
* @since 3.0
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.Executors
* @see java.util.concurrent.ThreadPoolExecutor
*/
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory
implements BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
private ThreadFactory threadFactory = this;
private boolean threadNamePrefixSet = false;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean waitForTasksToCompleteOnShutdown = false;
private String beanName;
private ExecutorService executor;
/**
* Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
* Default is the ThreadPoolExecutor's default thread factory.
* @see java.util.concurrent.Executors#defaultThreadFactory()
*/
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (threadFactory != null ? threadFactory : this);
}
@Override
public void setThreadNamePrefix(String threadNamePrefix) {
super.setThreadNamePrefix(threadNamePrefix);
this.threadNamePrefixSet = true;
}
/**
* Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
* Default is the ThreadPoolExecutor's default abort policy.
* @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler =
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}
/**
* Set whether to wait for scheduled tasks to complete on shutdown.
* <p>Default is "false". Switch this to "true" if you prefer
* fully completed tasks at the expense of a longer shutdown phase.
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Calls <code>initialize()</code> after the container applied all property values.
* @see #initialize()
*/
public void afterPropertiesSet() {
initialize();
}
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}
/**
* Create the target {@link java.util.concurrent.ExecutorService} instance.
* Called by <code>afterPropertiesSet</code>.
* @param threadFactory the ThreadFactory to use
* @param rejectedExecutionHandler the RejectedExecutionHandler to use
* @return a new ExecutorService instance
* @see #afterPropertiesSet()
*/
protected abstract ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler);
/**
* Calls <code>shutdown</code> when the BeanFactory destroys
* the task executor instance.
* @see #shutdown()
*/
public void destroy() {
shutdown();
}
/**
* Perform a shutdown on the ThreadPoolExecutor.
* @see java.util.concurrent.ExecutorService#shutdown()
*/
public void shutdown() {
if (logger.isInfoEnabled()) {
logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
this.executor.shutdownNow();
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,17 +16,13 @@
package org.springframework.scheduling.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
@@ -37,8 +33,8 @@ import org.springframework.util.ObjectUtils;
/**
* {@link org.springframework.beans.factory.FactoryBean} that sets up
* a JDK 1.5 {@link java.util.concurrent.ScheduledExecutorService}
* (by default: {@link java.util.concurrent.ScheduledThreadPoolExecutor}
* as implementation) and exposes it for bean references.
* (by default: a {@link java.util.concurrent.ScheduledThreadPoolExecutor})
* and exposes it for bean references.
*
* <p>Allows for registration of {@link ScheduledExecutorTask ScheduledExecutorTasks},
* automatically starting the {@link ScheduledExecutorService} on initialization and
@@ -66,29 +62,19 @@ import org.springframework.util.ObjectUtils;
* @see ScheduledExecutorTask
* @see java.util.concurrent.ScheduledExecutorService
* @see java.util.concurrent.ScheduledThreadPoolExecutor
* @see org.springframework.scheduling.timer.TimerFactoryBean
*/
public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
public class ScheduledExecutorFactoryBean extends ExecutorConfigurationSupport
implements FactoryBean<ScheduledExecutorService>, InitializingBean, DisposableBean {
private int poolSize = 1;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean exposeUnconfigurableExecutor = false;
private ScheduledExecutorTask[] scheduledExecutorTasks;
private boolean continueScheduledExecutionAfterException = false;
private boolean waitForTasksToCompleteOnShutdown = false;
private boolean exposeUnconfigurableExecutor = false;
private String beanName;
private ScheduledExecutorService executor;
private ScheduledExecutorService exposedExecutor;
/**
@@ -100,37 +86,6 @@ public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware,
this.poolSize = poolSize;
}
/**
* Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
* Default is the ThreadPoolExecutor's default thread factory.
* @see java.util.concurrent.Executors#defaultThreadFactory()
*/
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (threadFactory != null ? threadFactory : Executors.defaultThreadFactory());
}
/**
* Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
* Default is the ThreadPoolExecutor's default abort policy.
* @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler =
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}
/**
* Specify whether this FactoryBean should expose an unconfigurable
* decorator for the created executor.
* <p>Default is "false", exposing the raw executor as bean reference.
* Switch this flag to "true" to strictly prevent clients from
* modifying the executor's configuration.
* @see java.util.concurrent.Executors#unconfigurableScheduledExecutorService
*/
public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
}
/**
* Register a list of ScheduledExecutorTask objects with the ScheduledExecutorService
* that this FactoryBean creates. Depending on each ScheduledExecutorTask's settings,
@@ -157,28 +112,23 @@ public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware,
}
/**
* Set whether to wait for scheduled tasks to complete on shutdown.
* <p>Default is "false". Switch this to "true" if you prefer
* fully completed tasks at the expense of a longer shutdown phase.
* @see java.util.concurrent.ScheduledExecutorService#shutdown()
* @see java.util.concurrent.ScheduledExecutorService#shutdownNow()
* Specify whether this FactoryBean should expose an unconfigurable
* decorator for the created executor.
* <p>Default is "false", exposing the raw executor as bean reference.
* Switch this flag to "true" to strictly prevent clients from
* modifying the executor's configuration.
* @see java.util.concurrent.Executors#unconfigurableScheduledExecutorService
*/
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
public void setBeanName(String name) {
this.beanName = name;
public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
}
public void afterPropertiesSet() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ScheduledExecutorService" +
(this.beanName != null ? " '" + this.beanName + "'" : ""));
}
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
ScheduledExecutorService executor =
createExecutor(this.poolSize, this.threadFactory, this.rejectedExecutionHandler);
createExecutor(this.poolSize, threadFactory, rejectedExecutionHandler);
// Register specified ScheduledExecutorTasks, if necessary.
if (!ObjectUtils.isEmpty(this.scheduledExecutorTasks)) {
@@ -186,8 +136,10 @@ public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware,
}
// Wrap executor with an unconfigurable decorator.
this.executor = (this.exposeUnconfigurableExecutor ?
this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableScheduledExecutorService(executor) : executor);
return executor;
}
/**
@@ -216,8 +168,7 @@ public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware,
* @param executor the ScheduledExecutorService to register the tasks on.
*/
protected void registerTasks(ScheduledExecutorTask[] tasks, ScheduledExecutorService executor) {
for (int i = 0; i < tasks.length; i++) {
ScheduledExecutorTask task = tasks[i];
for (ScheduledExecutorTask task : tasks) {
Runnable runnable = getRunnableToSchedule(task);
if (task.isOneTimeTask()) {
executor.schedule(runnable, task.getDelay(), task.getTimeUnit());
@@ -249,35 +200,16 @@ public class ScheduledExecutorFactoryBean implements FactoryBean, BeanNameAware,
}
public Object getObject() {
return this.executor;
public ScheduledExecutorService getObject() {
return this.exposedExecutor;
}
public Class getObjectType() {
return (this.executor != null ? this.executor.getClass() : ScheduledExecutorService.class);
public Class<? extends ScheduledExecutorService> getObjectType() {
return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ScheduledExecutorService.class);
}
public boolean isSingleton() {
return true;
}
/**
* Cancel the ScheduledExecutorService on bean factory shutdown,
* stopping all scheduled tasks.
* @see java.util.concurrent.ScheduledExecutorService#shutdown()
*/
public void destroy() {
if (logger.isInfoEnabled()) {
logger.info("Shutting down ScheduledExecutorService" +
(this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
}
else {
this.executor.shutdownNow();
}
}
}

View File

@@ -0,0 +1,184 @@
/*
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.scheduling.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
/**
* JavaBean that allows for configuring a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor}
* in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds",
* "queueCapacity" properties) and exposing it as a bean reference of its native
* {@link java.util.concurrent.ExecutorService} type.
*
* <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
* constructor injection, or use a factory method definition that points to the JDK 1.5
* {@link java.util.concurrent.Executors} class.
*
* <p><b>If you need a timing-based {@link java.util.concurrent.ScheduledExecutorService}
* instead, consider {@link ScheduledExecutorFactoryBean}.</b>
* @author Juergen Hoeller
* @since 3.0
* @see java.util.concurrent.ExecutorService
* @see java.util.concurrent.Executors
* @see java.util.concurrent.ThreadPoolExecutor
*/
public class ThreadPoolExecutorFactoryBean extends ExecutorConfigurationSupport
implements FactoryBean<ExecutorService>, InitializingBean, DisposableBean {
private int corePoolSize = 1;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private boolean allowCoreThreadTimeOut = false;
private int queueCapacity = Integer.MAX_VALUE;
private boolean exposeUnconfigurableExecutor = false;
private ExecutorService exposedExecutor;
/**
* Set the ThreadPoolExecutor's core pool size.
* Default is 1.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
/**
* Set the ThreadPoolExecutor's maximum pool size.
* Default is <code>Integer.MAX_VALUE</code>.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setMaxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}
/**
* Set the ThreadPoolExecutor's keep-alive seconds.
* Default is 60.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
*/
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
/**
* Specify whether to allow core threads to time out. This enables dynamic
* growing and shrinking even in combination with a non-zero queue (since
* the max pool size will only grow once the queue is full).
* <p>Default is "false". Note that this feature is only available on Java 6
* or above. On Java 5, consider switching to the backport-concurrent
* version of ThreadPoolTaskExecutor which also supports this feature.
* @see java.util.concurrent.ThreadPoolExecutor#allowCoreThreadTimeOut(boolean)
*/
public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is <code>Integer.MAX_VALUE</code>.
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
* any other value will lead to a SynchronousQueue instance.
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
public void setQueueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
}
/**
* Specify whether this FactoryBean should expose an unconfigurable
* decorator for the created executor.
* <p>Default is "false", exposing the raw executor as bean reference.
* Switch this flag to "true" to strictly prevent clients from
* modifying the executor's configuration.
* @see java.util.concurrent.Executors#unconfigurableScheduledExecutorService
*/
public void setExposeUnconfigurableExecutor(boolean exposeUnconfigurableExecutor) {
this.exposeUnconfigurableExecutor = exposeUnconfigurableExecutor;
}
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
// Wrap executor with an unconfigurable decorator.
this.exposedExecutor = (this.exposeUnconfigurableExecutor ?
Executors.unconfigurableExecutorService(executor) : executor);
return executor;
}
/**
* Create the BlockingQueue to use for the ThreadPoolExecutor.
* <p>A LinkedBlockingQueue instance will be created for a positive
* capacity value; a SynchronousQueue else.
* @param queueCapacity the specified queue capacity
* @return the BlockingQueue instance
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
else {
return new SynchronousQueue<Runnable>();
}
}
public ExecutorService getObject() throws Exception {
return this.exposedExecutor;
}
public Class<? extends ExecutorService> getObjectType() {
return (this.exposedExecutor != null ? this.exposedExecutor.getClass() : ExecutorService.class);
}
public boolean isSingleton() {
return true;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,10 @@
package org.springframework.scheduling.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
@@ -26,12 +29,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
@@ -39,16 +36,16 @@ import org.springframework.util.Assert;
/**
* JavaBean that allows for configuring a JDK 1.5 {@link java.util.concurrent.ThreadPoolExecutor}
* in bean style (through its "corePoolSize", "maxPoolSize", "keepAliveSeconds", "queueCapacity"
* properties), exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
* This is an alternative to configuring a ThreadPoolExecutor instance directly using
* constructor injection, with a separate {@link ConcurrentTaskExecutor} adapter wrapping it.
* properties) and exposing it as a Spring {@link org.springframework.core.task.TaskExecutor}.
* This class is also well suited for management and monitoring (e.g. through JMX),
* providing several useful attributes: "corePoolSize", "maxPoolSize", "keepAliveSeconds"
* (all supporting updates at runtime); "poolSize", "activeCount" (for introspection only).
*
* <p>For any custom needs, in particular for defining a
* {@link java.util.concurrent.ScheduledThreadPoolExecutor}, it is recommended to
* use a straight definition of the Executor instance or a factory method definition
* that points to the JDK 1.5 {@link java.util.concurrent.Executors} class.
* To expose such a raw Executor as a Spring {@link org.springframework.core.task.TaskExecutor},
* simply wrap it with a {@link ConcurrentTaskExecutor} adapter.
* <p>For an alternative, you may set up a ThreadPoolExecutor instance directly using
* constructor injection, or use a factory method definition that points to the JDK 1.5
* {@link java.util.concurrent.Executors} class. To expose such a raw Executor as a
* Spring {@link org.springframework.core.task.TaskExecutor}, simply wrap it with a
* {@link org.springframework.scheduling.concurrent.ConcurrentTaskExecutor} adapter.
*
* <p><b>NOTE:</b> This class implements Spring's
* {@link org.springframework.core.task.TaskExecutor} interface as well as the JDK 1.5
@@ -57,19 +54,16 @@ import org.springframework.util.Assert;
* exception handling follows the TaskExecutor contract rather than the Executor contract,
* in particular regarding the {@link org.springframework.core.task.TaskRejectedException}.
*
* <p><b>If you prefer native {@link java.util.concurrent.ExecutorService} exposure instead,
* consider {@link ThreadPoolExecutorFactoryBean} as an alternative to this class.</b>
*
* @author Juergen Hoeller
* @since 2.0
* @see org.springframework.core.task.TaskExecutor
* @see java.util.concurrent.Executor
* @see java.util.concurrent.ThreadPoolExecutor
* @see java.util.concurrent.ScheduledThreadPoolExecutor
* @see java.util.concurrent.Executors
* @see ConcurrentTaskExecutor
*/
public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
implements SchedulingTaskExecutor, Executor, BeanNameAware, InitializingBean, DisposableBean {
protected final Log logger = LogFactory.getLog(getClass());
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor {
private final Object poolSizeMonitor = new Object();
@@ -83,16 +77,6 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
private int queueCapacity = Integer.MAX_VALUE;
private ThreadFactory threadFactory = this;
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
private boolean waitForTasksToCompleteOnShutdown = false;
private boolean threadNamePrefixSet = false;
private String beanName;
private ThreadPoolExecutor threadPoolExecutor;
@@ -190,76 +174,20 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
this.queueCapacity = queueCapacity;
}
/**
* Set the ThreadFactory to use for the ThreadPoolExecutor's thread pool.
* <p>Default is this executor itself (i.e. the factory that this executor
* inherits from). See {@link org.springframework.util.CustomizableThreadCreator}'s
* javadoc for available bean properties.
* @see #setThreadPriority
* @see #setDaemon
*/
public void setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = (threadFactory != null ? threadFactory : this);
}
/**
* Set the RejectedExecutionHandler to use for the ThreadPoolExecutor.
* Default is the ThreadPoolExecutor's default abort policy.
* @see java.util.concurrent.ThreadPoolExecutor.AbortPolicy
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler rejectedExecutionHandler) {
this.rejectedExecutionHandler =
(rejectedExecutionHandler != null ? rejectedExecutionHandler : new ThreadPoolExecutor.AbortPolicy());
}
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
/**
* Set whether to wait for scheduled tasks to complete on shutdown.
* <p>Default is "false". Switch this to "true" if you prefer
* fully completed tasks at the expense of a longer shutdown phase.
* @see java.util.concurrent.ThreadPoolExecutor#shutdown()
* @see java.util.concurrent.ThreadPoolExecutor#shutdownNow()
*/
public void setWaitForTasksToCompleteOnShutdown(boolean waitForJobsToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForJobsToCompleteOnShutdown;
}
@Override
public void setThreadNamePrefix(String threadNamePrefix) {
super.setThreadNamePrefix(threadNamePrefix);
this.threadNamePrefixSet = true;
}
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Calls <code>initialize()</code> after the container applied all property values.
* @see #initialize()
*/
public void afterPropertiesSet() {
initialize();
}
/**
* Creates the BlockingQueue and the ThreadPoolExecutor.
* @see #createQueue
*/
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ThreadPoolExecutor" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
BlockingQueue queue = createQueue(this.queueCapacity);
this.threadPoolExecutor = new ThreadPoolExecutor(
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, this.threadFactory, this.rejectedExecutionHandler);
queue, threadFactory, rejectedExecutionHandler);
if (this.allowCoreThreadTimeOut) {
this.threadPoolExecutor.allowCoreThreadTimeOut(true);
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
/**
@@ -271,12 +199,12 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
*/
protected BlockingQueue createQueue(int queueCapacity) {
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
if (queueCapacity > 0) {
return new LinkedBlockingQueue(queueCapacity);
return new LinkedBlockingQueue<Runnable>(queueCapacity);
}
else {
return new SynchronousQueue();
return new SynchronousQueue<Runnable>();
}
}
@@ -290,31 +218,6 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
return this.threadPoolExecutor;
}
/**
* Implementation of both the JDK 1.5 Executor interface and the Spring
* TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
* @see java.util.concurrent.Executor#execute(Runnable)
* @see org.springframework.core.task.TaskExecutor#execute(Runnable)
*/
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/**
* This task executor prefers short-lived work units.
*/
public boolean prefersShortLivedTasks() {
return true;
}
/**
* Return the current pool size.
* @see java.util.concurrent.ThreadPoolExecutor#getPoolSize()
@@ -333,28 +236,50 @@ public class ThreadPoolTaskExecutor extends CustomizableThreadFactory
/**
* Calls <code>shutdown</code> when the BeanFactory destroys
* the task executor instance.
* @see #shutdown()
* Implementation of both the JDK 1.5 Executor interface and the Spring
* TaskExecutor interface, delegating to the ThreadPoolExecutor instance.
* @see java.util.concurrent.Executor#execute(Runnable)
* @see org.springframework.core.task.TaskExecutor#execute(Runnable)
*/
public void destroy() {
shutdown();
public void execute(Runnable task) {
Executor executor = getThreadPoolExecutor();
try {
executor.execute(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
public void execute(Runnable task, long startTimeout) {
execute(task);
}
public Future<?> submit(Runnable task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
public <T> Future<T> submit(Callable<T> task) {
ExecutorService executor = getThreadPoolExecutor();
try {
return executor.submit(task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
/**
* Perform a shutdown on the ThreadPoolExecutor.
* @see java.util.concurrent.ThreadPoolExecutor#shutdown()
* This task executor prefers short-lived work units.
*/
public void shutdown() {
if (logger.isInfoEnabled()) {
logger.info("Shutting down ThreadPoolExecutor" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.waitForTasksToCompleteOnShutdown) {
this.threadPoolExecutor.shutdown();
}
else {
this.threadPoolExecutor.shutdownNow();
}
public boolean prefersShortLivedTasks() {
return true;
}
}

View File

@@ -2,9 +2,10 @@
<body>
Scheduling convenience classes for the JDK 1.5+ Executor mechanism
in the <code>java.util.concurrent</code> package, allowing to set
up a ThreadPoolExecutor or ScheduledThreadPoolExecutor as bean in
a Spring context.
in the <code>java.util.concurrent</code> package, allowing to set up a
ThreadPoolExecutor or ScheduledThreadPoolExecutor as a bean in a Spring
context. Provides support for the native <code>java.util.concurrent</code>
interfaces as well as the Spring <code>TaskExecutor</code> mechanism.
</body>
</html>

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,10 +103,7 @@ public class TimerFactoryBean implements FactoryBean, BeanNameAware, Initializin
/**
* Create a new Timer instance. Called by <code>afterPropertiesSet</code>.
* Can be overridden in subclasses to provide custom Timer subclasses.
* <p>Uses the specified name as Timer thread name on JDK 1.5,
* simply falling back to a default Timer thread on JDK 1.4.
* @param name the desired name of the Timer's associated thread
* (applied on JDK 1.5 and higher; ignored on JDK 1.4)
* @param daemon whether to create a Timer that runs as daemon thread
* @return a new Timer instance
* @see #afterPropertiesSet()

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2008 the original author or authors.
* Copyright 2002-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,9 @@
package org.springframework.scheduling.timer;
import java.util.Timer;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -41,7 +44,7 @@ public class TimerTaskExecutor implements SchedulingTaskExecutor, InitializingBe
private Timer timer;
private int delay = 0;
private long delay = 0;
private boolean timerInternal = false;
@@ -76,11 +79,13 @@ public class TimerTaskExecutor implements SchedulingTaskExecutor, InitializingBe
}
/**
* Set the delay to use for scheduling tasks passed into the
* <code>execute</code> method. Default is 0.
* Set the delay to use for scheduling tasks passed into the plain
* {@link #execute(Runnable)} method. Default is 0.
* <p>Note that calls to {@link #execute(Runnable, long)} will use the
* given timeout as delay if it is lower than the general delay.
* @param delay the delay in milliseconds before the task is to be executed
*/
public void setDelay(int delay) {
public void setDelay(long delay) {
this.delay = delay;
}
@@ -117,6 +122,24 @@ public class TimerTaskExecutor implements SchedulingTaskExecutor, InitializingBe
this.timer.schedule(new DelegatingTimerTask(task), this.delay);
}
public void execute(Runnable task, long startTimeout) {
Assert.notNull(this.timer, "Timer is required");
long actualDelay = (startTimeout < this.delay ? startTimeout : this.delay);
this.timer.schedule(new DelegatingTimerTask(task), actualDelay);
}
public Future<?> submit(Runnable task) {
FutureTask<Object> future = new FutureTask<Object>(task, null);
execute(future);
return future;
}
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> future = new FutureTask<T>(task);
execute(future);
return future;
}
/**
* This task executor prefers short-lived work units.
*/