GH-3055: Fix DirectMLContainer for taskScheduler.shutdown()

Fixes: #3055
Issue link: https://github.com/spring-projects/spring-amqp/issues/3057

The internal `DirectMessageListenerContainer.taskScheduler` is not destroyed
when application context is closed.
The current `doStop()` implementation is only called from `Lifecycle.stop()`.
However, the application context calls the `SmartLifecycle.stop(Runnable)`.
That one, in turn, in the `AbstractMessageListenerContainer` calls a `shutdown()`,
which does not clean up `taskScheduler` in the `DirectMessageListenerContainer`
implementation.
In fact, this `shutdown()` is called from other volatile places in the `DirectMessageListenerContainer`,
where assumption is that `taskScheduler` is active.
Therefore, we cannot move `doStop()` extension into the `actualShutDown()` implementation.

* Extract `cleanUpTaskScheduler()` method in the `DirectMessageListenerContainer`
and call it from existing `doStop()`, from overridden `stop(Runnable)` and `destroy()`

**Auto-cherry-pick to `3.1.x`**
This commit is contained in:
Artem Bilan
2025-05-02 14:21:53 -04:00
parent bf53ac2885
commit 5480b2b215
2 changed files with 41 additions and 2 deletions

View File

@@ -425,14 +425,24 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
}
@Override
protected void doStop() {
super.doStop();
public void stop(Runnable callback) {
super.stop(callback);
cleanUpTaskScheduler();
}
private void cleanUpTaskScheduler() {
if (!this.taskSchedulerSet && this.taskScheduler != null) {
((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
this.taskScheduler = null;
}
}
@Override
protected void doStop() {
super.doStop();
cleanUpTaskScheduler();
}
protected void actualStart() {
this.aborted = false;
this.hasStopped = false;
@@ -975,6 +985,12 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta
// default empty
}
@Override
public void destroy() {
super.destroy();
cleanUpTaskScheduler();
}
/**
* The consumer object.
*/

View File

@@ -18,6 +18,7 @@ package org.springframework.amqp.rabbit.listener;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.rabbitmq.client.AMQP.BasicProperties;
@@ -139,4 +140,26 @@ public class ContainerShutDownTests {
}
}
@Test
void directMessageListenerContainerShutdownsItsSchedulerOnStopWithCallback() {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
container.setConnectionFactory(cf);
container.setQueueNames("test.shutdown");
container.setMessageListener(m -> {
});
container.start();
ScheduledExecutorService scheduledExecutorService =
TestUtils.getPropertyValue(container, "taskScheduler.scheduledExecutor", ScheduledExecutorService.class);
container.stop(() -> {
});
cf.destroy();
assertThat(scheduledExecutorService.isShutdown()).isTrue();
}
}