GH-439: Container Lifecycle Fixes

Fixes https://github.com/spring-projects/spring-kafka/issues/439

Ignore `stop()` if container is not running, `start()` if container is running.

__cherry-pick to all branches__

* Use `doStop(Runnable)` optimization from the `stop()`

# Conflicts:
#	spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java
#	spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
This commit is contained in:
Gary Russell
2017-09-28 08:50:17 -04:00
committed by Artem Bilan
parent f3ce1e172d
commit 3b060d833d

View File

@@ -196,10 +196,12 @@ public abstract class AbstractMessageListenerContainer<K, V>
@Override
public final void start() {
synchronized (this.lifecycleMonitor) {
Assert.isTrue(
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
"A " + KafkaDataListener.class.getName() + " implementation must be provided");
doStart();
if (!isRunning()) {
Assert.isTrue(
this.containerProperties.getMessageListener() instanceof GenericMessageListener,
"A " + GenericMessageListener.class.getName() + " implementation must be provided");
doStart();
}
}
}
@@ -207,24 +209,30 @@ public abstract class AbstractMessageListenerContainer<K, V>
@Override
public final void stop() {
final CountDownLatch latch = new CountDownLatch(1);
stop(new Runnable() {
@Override
public void run() {
latch.countDown();
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
final CountDownLatch latch = new CountDownLatch(1);
doStop(new Runnable() {
@Override
public void run() {
latch.countDown();
}
});
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
}
}
});
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
}
}
@Override
public void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
doStop(callback);
if (isRunning()) {
doStop(callback);
}
}
}