ThreadPoolTaskExecutor/Scheduler cancels remaining Futures on shutdown

Issue: SPR-16607
This commit is contained in:
Juergen Hoeller
2018-03-19 01:45:22 +01:00
parent 9d63f805b3
commit 3c1adf7f6a
8 changed files with 478 additions and 96 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package org.springframework.scheduling.concurrent;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -209,12 +210,28 @@ public abstract class ExecutorConfigurationSupport extends CustomizableThreadFac
this.executor.shutdown();
}
else {
this.executor.shutdownNow();
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
* @param task the task to cancel (potentially a {@link RunnableFuture})
* @since 5.0.5
* @see #shutdown()
* @see RunnableFuture#cancel(boolean)
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof RunnableFuture) {
((RunnableFuture<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of the
* {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package org.springframework.scheduling.concurrent;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
@@ -35,6 +36,7 @@ import org.springframework.core.task.TaskRejectedException;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
@@ -90,6 +92,10 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
@Nullable
private ThreadPoolExecutor threadPoolExecutor;
// Runnable decorator to user-level FutureTask, if different
private final Map<Runnable, Object> decoratedTaskMap =
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
/**
* Set the ThreadPoolExecutor's core pool size.
@@ -217,7 +223,11 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
@@ -353,6 +363,16 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
}
}
@Override
protected void cancelRemainingTask(Runnable task) {
super.cancelRemainingTask(task);
// Cancel associated user-level Future handle as well
Object original = this.decoratedTaskMap.get(task);
if (original instanceof Future) {
((Future<?>) original).cancel(true);
}
}
/**
* This task executor prefers short-lived work units.
*/

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package org.springframework.scheduling.concurrent;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
@@ -37,6 +38,7 @@ import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.TaskUtils;
import org.springframework.util.Assert;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.ErrorHandler;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
@@ -67,6 +69,10 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
@Nullable
private ScheduledExecutorService scheduledExecutor;
// Underlying ScheduledFutureTask to user-level ListenableFuture handle, if any
private final Map<Object, ListenableFuture<?>> listenableFutureMap =
new ConcurrentReferenceHashMap<>(16, ConcurrentReferenceHashMap.ReferenceType.WEAK);
/**
* Set the ScheduledExecutorService's pool size.
@@ -253,9 +259,9 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
public ListenableFuture<?> submitListenable(Runnable task) {
ExecutorService executor = getScheduledExecutor();
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
executor.execute(errorHandlingTask(future, false));
return future;
ListenableFutureTask<Object> listenableFuture = new ListenableFutureTask<>(task, null);
executeAndTrack(executor, listenableFuture);
return listenableFuture;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
@@ -266,15 +272,32 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
ExecutorService executor = getScheduledExecutor();
try {
ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
executor.execute(errorHandlingTask(future, false));
return future;
ListenableFutureTask<T> listenableFuture = new ListenableFutureTask<>(task);
executeAndTrack(executor, listenableFuture);
return listenableFuture;
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
private void executeAndTrack(ExecutorService executor, ListenableFutureTask<?> listenableFuture) {
Future<?> scheduledFuture = executor.submit(errorHandlingTask(listenableFuture, false));
this.listenableFutureMap.put(scheduledFuture, listenableFuture);
listenableFuture.addCallback(result -> listenableFutureMap.remove(scheduledFuture),
ex -> listenableFutureMap.remove(scheduledFuture));
}
@Override
protected void cancelRemainingTask(Runnable task) {
super.cancelRemainingTask(task);
// Cancel associated user-level ListenableFuture handle as well
ListenableFuture<?> listenableFuture = this.listenableFutureMap.get(task);
if (listenableFuture != null) {
listenableFuture.cancel(true);
}
}
@Override
public boolean prefersShortLivedTasks() {
return true;