From 3b060d833d9b813de5156d74664099028ab3a487 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Thu, 28 Sep 2017 08:50:17 -0400 Subject: [PATCH] GH-439: Container Lifecycle Fixes Fixes https://github.com/spring-projects/spring-kafka/issues/439 Ignore `stop()` if container is not running, `start()` if container is running. __cherry-pick to all branches__ * Use `doStop(Runnable)` optimization from the `stop()` # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java --- .../AbstractMessageListenerContainer.java | 38 +++++++++++-------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 25d719fd..c03ecf11 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -196,10 +196,12 @@ public abstract class AbstractMessageListenerContainer @Override public final void start() { synchronized (this.lifecycleMonitor) { - Assert.isTrue( - this.containerProperties.getMessageListener() instanceof KafkaDataListener, - "A " + KafkaDataListener.class.getName() + " implementation must be provided"); - doStart(); + if (!isRunning()) { + Assert.isTrue( + this.containerProperties.getMessageListener() instanceof GenericMessageListener, + "A " + GenericMessageListener.class.getName() + " implementation must be provided"); + doStart(); + } } } @@ -207,24 +209,30 @@ public abstract class AbstractMessageListenerContainer @Override public final void stop() { - final CountDownLatch latch = new CountDownLatch(1); - stop(new Runnable() { - @Override - public void run() { - latch.countDown(); + synchronized (this.lifecycleMonitor) { + if (isRunning()) { + final CountDownLatch latch = new CountDownLatch(1); + doStop(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + try { + latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) { + } } - }); - try { - latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); - } - catch (InterruptedException e) { } } @Override public void stop(Runnable callback) { synchronized (this.lifecycleMonitor) { - doStop(callback); + if (isRunning()) { + doStop(callback); + } } }