GH-439: Container Lifecycle Fixes
Fixes https://github.com/spring-projects/spring-kafka/issues/439 Ignore `stop()` if container is not running, `start()` if container is running. __cherry-pick to all branches__ * Use `doStop(Runnable)` optimization from the `stop()` # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
This commit is contained in:
committed by
Artem Bilan
parent
f3ce1e172d
commit
3b060d833d
@@ -196,10 +196,12 @@ public abstract class AbstractMessageListenerContainer<K, V>
|
||||
@Override
|
||||
public final void start() {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
Assert.isTrue(
|
||||
this.containerProperties.getMessageListener() instanceof KafkaDataListener,
|
||||
"A " + KafkaDataListener.class.getName() + " implementation must be provided");
|
||||
doStart();
|
||||
if (!isRunning()) {
|
||||
Assert.isTrue(
|
||||
this.containerProperties.getMessageListener() instanceof GenericMessageListener,
|
||||
"A " + GenericMessageListener.class.getName() + " implementation must be provided");
|
||||
doStart();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -207,24 +209,30 @@ public abstract class AbstractMessageListenerContainer<K, V>
|
||||
|
||||
@Override
|
||||
public final void stop() {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
stop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
if (isRunning()) {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
doStop(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
try {
|
||||
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
try {
|
||||
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(Runnable callback) {
|
||||
synchronized (this.lifecycleMonitor) {
|
||||
doStop(callback);
|
||||
if (isRunning()) {
|
||||
doStop(callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user