diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java index 1325e0ac..77bffce7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/AbstractPulsarMessageListenerContainer.java @@ -227,16 +227,28 @@ public non-sealed abstract class AbstractPulsarMessageListenerContainer imple @Override public void pause() { - this.paused = true; + synchronized (this.lifecycleMonitor) { + doPause(); + } } @Override public void resume() { - this.paused = false; + synchronized (this.lifecycleMonitor) { + doResume(); + } } protected boolean isPaused() { return this.paused; } + protected void setPaused(boolean paused) { + this.paused = paused; + } + + protected abstract void doPause(); + + protected abstract void doResume(); + } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java index 146d55a0..59d9c7cb 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/ConcurrentPulsarMessageListenerContainer.java @@ -135,17 +135,17 @@ public class ConcurrentPulsarMessageListenerContainer extends AbstractPulsarM } @Override - public void pause() { - synchronized (this.lifecycleMonitor) { - super.pause(); + public void doPause() { + if (!isPaused()) { + setPaused(true); this.containers.forEach(AbstractPulsarMessageListenerContainer::pause); } } @Override - public void resume() { - synchronized (this.lifecycleMonitor) { - super.resume(); + public void doResume() { + if (isPaused()) { + setPaused(false); this.containers.forEach(AbstractPulsarMessageListenerContainer::resume); } } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java index 7c873c91..2b5a25f7 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/listener/DefaultPulsarMessageListenerContainer.java @@ -192,8 +192,8 @@ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMess } @Override - public void pause() { - super.pause(); + public void doPause() { + setPaused(true); DefaultPulsarMessageListenerContainer.Listener consumer = this.listenerConsumer; if (consumer != null) { consumer.pause(); @@ -201,12 +201,12 @@ public class DefaultPulsarMessageListenerContainer extends AbstractPulsarMess } @Override - public void resume() { + public void doResume() { DefaultPulsarMessageListenerContainer.Listener consumer = this.listenerConsumer; if (consumer != null) { consumer.resume(); } - super.resume(); + setPaused(false); } private final class Listener implements SchedulingAwareRunnable {