GH-1134: Fix stop(Runnable) usage

Fixes https://github.com/spring-projects/spring-kafka/issues/1134

We always have to call `callback` in the `stop(Runnable)` implementation
independently of the component state

**Cherry-pick until 1.1.x to support Spring Boot 1.5.x**
This commit is contained in:
Artem Bilan
2019-06-21 13:42:09 -04:00
committed by Gary Russell
parent 1dff78818b
commit bb06d194f6
2 changed files with 16 additions and 8 deletions

View File

@@ -248,16 +248,21 @@ public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifec
@Override
public void stop(Runnable callback) {
Collection<MessageListenerContainer> listenerContainers = getListenerContainers();
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainers.size(), callback);
for (MessageListenerContainer listenerContainer : listenerContainers) {
if (listenerContainer.isRunning()) {
listenerContainer.stop(aggregatingCallback);
}
else {
aggregatingCallback.run();
Collection<MessageListenerContainer> listenerContainersToStop = getListenerContainers();
if (listenerContainersToStop.size() > 0) {
AggregatingCallback aggregatingCallback = new AggregatingCallback(listenerContainersToStop.size(), callback);
for (MessageListenerContainer listenerContainer : listenerContainersToStop) {
if (listenerContainer.isRunning()) {
listenerContainer.stop(aggregatingCallback);
}
else {
aggregatingCallback.run();
}
}
}
else {
callback.run();
}
}
@Override

View File

@@ -264,6 +264,9 @@ public abstract class AbstractMessageListenerContainer<K, V>
if (isRunning()) {
doStop(callback);
}
else {
callback.run();
}
}
}