Pause/Resume further improvements (#298)

- Move pause/resume operation to abstract container
- Only pause/resume if container in proper state

Resovles #296
This commit is contained in:
Soby Chacko
2023-01-30 16:39:48 -05:00
committed by GitHub
parent 280637f98f
commit 224eb0b3e2
3 changed files with 24 additions and 12 deletions

View File

@@ -227,16 +227,28 @@ public non-sealed abstract class AbstractPulsarMessageListenerContainer<T> imple
@Override
public void pause() {
this.paused = true;
synchronized (this.lifecycleMonitor) {
doPause();
}
}
@Override
public void resume() {
this.paused = false;
synchronized (this.lifecycleMonitor) {
doResume();
}
}
protected boolean isPaused() {
return this.paused;
}
protected void setPaused(boolean paused) {
this.paused = paused;
}
protected abstract void doPause();
protected abstract void doResume();
}

View File

@@ -135,17 +135,17 @@ public class ConcurrentPulsarMessageListenerContainer<T> extends AbstractPulsarM
}
@Override
public void pause() {
synchronized (this.lifecycleMonitor) {
super.pause();
public void doPause() {
if (!isPaused()) {
setPaused(true);
this.containers.forEach(AbstractPulsarMessageListenerContainer::pause);
}
}
@Override
public void resume() {
synchronized (this.lifecycleMonitor) {
super.resume();
public void doResume() {
if (isPaused()) {
setPaused(false);
this.containers.forEach(AbstractPulsarMessageListenerContainer::resume);
}
}

View File

@@ -192,8 +192,8 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
}
@Override
public void pause() {
super.pause();
public void doPause() {
setPaused(true);
DefaultPulsarMessageListenerContainer<T>.Listener consumer = this.listenerConsumer;
if (consumer != null) {
consumer.pause();
@@ -201,12 +201,12 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
}
@Override
public void resume() {
public void doResume() {
DefaultPulsarMessageListenerContainer<T>.Listener consumer = this.listenerConsumer;
if (consumer != null) {
consumer.resume();
}
super.resume();
setPaused(false);
}
private final class Listener implements SchedulingAwareRunnable {