From bb06d194f6d10e193235ac9aa720be8cc2aecbcc Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Fri, 21 Jun 2019 13:42:09 -0400 Subject: [PATCH] GH-1134: Fix `stop(Runnable)` usage Fixes https://github.com/spring-projects/spring-kafka/issues/1134 We always have to call `callback` in the `stop(Runnable)` implementation independently of the component state **Cherry-pick until 1.1.x to support Spring Boot 1.5.x** --- .../config/KafkaListenerEndpointRegistry.java | 21 ++++++++++++------- .../AbstractMessageListenerContainer.java | 3 +++ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java index d55a0fd3..e83c8cae 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java @@ -248,16 +248,21 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec @Override public void stop(Runnable callback) { - Collection listenerContainers = getListenerContainers(); - AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback); - for (MessageListenerContainer listenerContainer : listenerContainers) { - if (listenerContainer.isRunning()) { - listenerContainer.stop(aggregatingCallback); - } - else { - aggregatingCallback.run(); + Collection listenerContainersToStop = getListenerContainers(); + if (listenerContainersToStop.size() > 0) { + AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback); + for (MessageListenerContainer listenerContainer : listenerContainersToStop) { + if (listenerContainer.isRunning()) { + listenerContainer.stop(aggregatingCallback); + } + else { + aggregatingCallback.run(); + } } } + else { + callback.run(); + } } @Override diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index ca84cdaa..7d50e3e1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -264,6 +264,9 @@ public abstract class AbstractMessageListenerContainer if (isRunning()) { doStop(callback); } + else { + callback.run(); + } } }