GH-3055: Fix DirectMLContainer for taskScheduler.shutdown()

Fixes: #3055
Issue link: https://github.com/spring-projects/spring-amqp/issues/3057

The internal `DirectMessageListenerContainer.taskScheduler` is not destroyed
when application context is closed.
The current `doStop()` implementation is only called from `Lifecycle.stop()`.
However, the application context calls the `SmartLifecycle.stop(Runnable)`.
That one, in turn, in the `AbstractMessageListenerContainer` calls a `shutdown()`,
which does not clean up `taskScheduler` in the `DirectMessageListenerContainer`
implementation.
In fact, this `shutdown()` is called from other volatile places in the `DirectMessageListenerContainer`,
where assumption is that `taskScheduler` is active.
Therefore, we cannot move `doStop()` extension into the `actualShutDown()` implementation.

* Extract `cleanUpTaskScheduler()` method in the `DirectMessageListenerContainer`
and call it from existing `doStop()`, from overridden `stop(Runnable)` and `destroy()`

(cherry picked from commit 5480b2b215)
This commit is contained in:
Artem Bilan
2025-05-02 14:21:53 -04:00
committed by Spring Builds
parent cdb28fe0de
commit e9a3fd31f3
2 changed files with 41 additions and 2 deletions

View File

@@ -426,14 +426,24 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
}
@Override
protected void doStop() {
super.doStop();
public void stop(Runnable callback) {
super.stop(callback);
cleanUpTaskScheduler();
}
private void cleanUpTaskScheduler() {
if (!this.taskSchedulerSet && this.taskScheduler != null) {
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
this.taskScheduler = null;
}
}
@Override
protected void doStop() {
super.doStop();
cleanUpTaskScheduler();
}
protected void actualStart() {
this.aborted = false;
this.hasStopped = false;
@@ -976,6 +986,12 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
// default empty
}
@Override
public void destroy() {
super.destroy();
cleanUpTaskScheduler();
}
/**
* The consumer object.
*/

View File

@@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
@@ -140,4 +141,26 @@ public class ContainerShutDownTests {
}
}
@Test
void directMessageListenerContainerShutdownsItsSchedulerOnStopWithCallback() {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
container.setConnectionFactory(cf);
container.setQueueNames("test.shutdown");
container.setMessageListener(m -> {
});
container.start();
ScheduledExecutorService scheduledExecutorService =
TestUtils.getPropertyValue(container, "taskScheduler.scheduledExecutor", ScheduledExecutorService.class);
container.stop(() -> {
});
cf.destroy();
assertThat(scheduledExecutorService.isShutdown()).isTrue();
}
}