diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java index 44ffc36f..8179099b 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java @@ -426,14 +426,24 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta } @Override - protected void doStop() { - super.doStop(); + public void stop(Runnable callback) { + super.stop(callback); + cleanUpTaskScheduler(); + } + + private void cleanUpTaskScheduler() { if (!this.taskSchedulerSet && this.taskScheduler != null) { ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); this.taskScheduler = null; } } + @Override + protected void doStop() { + super.doStop(); + cleanUpTaskScheduler(); + } + protected void actualStart() { this.aborted = false; this.hasStopped = false; @@ -976,6 +986,12 @@ public class DirectMessageListenerContainer extends AbstractMessageListenerConta // default empty } + @Override + public void destroy() { + super.destroy(); + cleanUpTaskScheduler(); + } + /** * The consumer object. */ diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java index 1902921a..79ccd4ab 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/ContainerShutDownTests.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.Test; @@ -140,4 +141,26 @@ public class ContainerShutDownTests { } } + @Test + void directMessageListenerContainerShutdownsItsSchedulerOnStopWithCallback() { + DirectMessageListenerContainer container = new DirectMessageListenerContainer(); + CachingConnectionFactory cf = new CachingConnectionFactory("localhost"); + container.setConnectionFactory(cf); + container.setQueueNames("test.shutdown"); + container.setMessageListener(m -> { + }); + + container.start(); + + ScheduledExecutorService scheduledExecutorService = + TestUtils.getPropertyValue(container, "taskScheduler.scheduledExecutor", ScheduledExecutorService.class); + + container.stop(() -> { + }); + + cf.destroy(); + + assertThat(scheduledExecutorService.isShutdown()).isTrue(); + } + }