Add ability to pause/resume listener containers

Resolves #286
This commit is contained in:
Soby Chacko
2023-01-27 20:11:58 -05:00
committed by Chris Bono
parent 6fdd6c743c
commit 53bd54af4c
6 changed files with 210 additions and 5 deletions

View File

@@ -1106,6 +1106,83 @@ On retry, the container sends a new batch of messages, starting with the failed
If it fails again, it is retried, until the retries are exhausted, at which point the message is sent to the DLT.
At that point, the message is acknowledged by the container, and the listener is handed over with the subsequent messages in the original batch.
=== Pausing and Resuming Message Listener Containers
There are situations in which an application might want to pause message consumption temporarily and then resume later.
Spring for Apache Pulsar provides the ability to pause and resume the underlying message listener containers.
When the Pulsar message listener container is paused, any polling done by the container to receive data from the Pulsar consumer will be paused.
Similarly, when the container is resumed, the next poll starts returning data if the topic has any new records added while paused.
In order for applications to pause and resume the listener container, they need to inject a special bean called `PulsarListenerEndpointRegistry`.
To pause or resume a listener container, first obtain the container instance by querying the PulsarListenerEndpointRegistry bean and then invoke the pause/resume API on the instance.
Here is an example.
====
[source, java]
----
@Bean
ApplicationRunner runner1(PulsarTemplate<String> pulsarTemplate, PulsarListenerEndpointRegistry pulsarListenerEndpointRegistry) {
String topic = "hello-pulsar";
return args -> {
for (int i = 0; i < 5; i++) {
pulsarTemplate.send(topic, "This is message " + (i + 1));
};
Thread.sleep(10_000);
PulsarMessageListenerContainer listenerContainer = pulsarListenerEndpointRegistry.getListenerContainer("consumer-pause-resume-listener");
System.out.println("Pausing and sleeping for 10 seconds");
listenerContainer.pause();
Thread.sleep(10_000);
for (int i = 5; i < 10; i++) {
pulsarTemplate.send(topic, "This is message " + (i + 1));
}
Thread.sleep(10_000);
System.out.println("Slept for 10 seconds and resuming");
listenerContainer.resume();
};
}
@PulsarListener(id = "consumer-pause-resume-listener", subscriptionName = "my-subscription", topics = "hello-pulsar")
void listen1(String message) {
System.out.println(message);
}
----
====
TIP: The id parameter is the value of the `@PulsarListener` id attribute (when pausing a `@PulsarListener`).
When running the above app, you will see the following output:
```
This is message 1
This is message 2
This is message 3
This is message 4
This is message 5
Pausing and sleeping for 10 seconds
Slept for 10 seconds and resuming
This is message 6
This is message 7
This is message 8
This is message 9
This is message 10
```
Here is another way to inject the `PulsarListenerEndpointRegistry` and invoke pause on the container.
```
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
```
== Publishing and Consuming Partitioned Topics
In the following example, we publish to a topic called `hello-pulsar-partitioned`.

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,6 +64,8 @@ public non-sealed abstract class AbstractPulsarMessageListenerContainer<T> imple
private volatile boolean running = false;
private volatile boolean paused;
protected RedeliveryBackoff negativeAckRedeliveryBackoff;
protected RedeliveryBackoff ackTimeoutRedeliveryBackoff;
@@ -223,4 +225,18 @@ public non-sealed abstract class AbstractPulsarMessageListenerContainer<T> imple
this.pulsarConsumerErrorHandler = pulsarConsumerErrorHandler;
}
@Override
public void pause() {
this.paused = true;
}
@Override
public void resume() {
this.paused = false;
}
protected boolean isPaused() {
return this.paused;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -134,4 +134,20 @@ public class ConcurrentPulsarMessageListenerContainer<T> extends AbstractPulsarM
return this.containers;
}
@Override
public void pause() {
synchronized (this.lifecycleMonitor) {
super.pause();
this.containers.forEach(AbstractPulsarMessageListenerContainer::pause);
}
}
@Override
public void resume() {
synchronized (this.lifecycleMonitor) {
super.resume();
this.containers.forEach(AbstractPulsarMessageListenerContainer::resume);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -191,6 +191,24 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
}
}
@Override
public void pause() {
super.pause();
DefaultPulsarMessageListenerContainer<T>.Listener consumer = this.listenerConsumer;
if (consumer != null) {
consumer.pause();
}
}
@Override
public void resume() {
DefaultPulsarMessageListenerContainer<T>.Listener consumer = this.listenerConsumer;
if (consumer != null) {
consumer.resume();
}
super.resume();
}
private final class Listener implements SchedulingAwareRunnable {
private final PulsarRecordMessageListener<T> listener;
@@ -338,7 +356,9 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
try {
if (!inRetryMode.get() && !messagesPendingInBatch.get()) {
DefaultPulsarMessageListenerContainer.this.receiveInProgress.set(true);
messages = this.consumer.batchReceive();
if (!isPaused()) {
messages = this.consumer.batchReceive();
}
}
}
catch (PulsarClientException e) {
@@ -602,6 +622,18 @@ public class DefaultPulsarMessageListenerContainer<T> extends AbstractPulsarMess
AbstractAcknowledgement.handleAckByMessageId(this.consumer, message.getMessageId());
}
public void pause() {
if (this.consumer != null) {
this.consumer.pause();
}
}
public void resume() {
if (this.consumer != null) {
this.consumer.resume();
}
}
}
private static abstract class AbstractAcknowledgement implements Acknowledgement {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2022 the original author or authors.
* Copyright 2022-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -43,4 +43,19 @@ public sealed interface PulsarMessageListenerContainer
@SuppressWarnings("rawtypes")
void setPulsarConsumerErrorHandler(PulsarConsumerErrorHandler pulsarConsumerErrorHandler);
/**
* Pause this container before the next poll(). The next poll by the container will be
* disabled as long as {@link #resume()} is not called.
*/
default void pause() {
throw new UnsupportedOperationException("This container doesn't support pause");
}
/**
* Resume this container, if paused.
*/
default void resume() {
throw new UnsupportedOperationException("This container doesn't support resume");
}
}

View File

@@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -47,6 +48,7 @@ import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
@@ -1025,4 +1027,51 @@ public class PulsarListenerTests implements PulsarTestContainerSupport {
}
@Nested
@ContextConfiguration(classes = ConsumerPauseTest.ConsumerPauseConfig.class)
class ConsumerPauseTest {
private static final CountDownLatch latch = new CountDownLatch(10);
@Autowired
private PulsarListenerEndpointRegistry pulsarListenerEndpointRegistry;
@Test
void containerPauseAndResumeSuccessfully() throws Exception {
for (int i = 0; i < 3; i++) {
pulsarTemplate.send("consumer-pause-topic", "hello-" + i);
}
// wait until all 3 messages are received by the listener
Awaitility.await().timeout(Duration.ofSeconds(10)).until(() -> latch.getCount() == 7);
PulsarMessageListenerContainer container = pulsarListenerEndpointRegistry
.getListenerContainer("consumerPauseListener");
assertThat(container).isNotNull();
container.pause();
Thread.sleep(1000);
for (int i = 3; i < 10; i++) {
pulsarTemplate.send("consumer-pause-topic", "hello-" + i);
}
Thread.sleep(1000);
assertThat(latch.getCount()).isEqualTo(7);
container.resume();
// All latch must be received by now
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
}
@EnablePulsar
@Configuration
static class ConsumerPauseConfig {
@PulsarListener(id = "consumerPauseListener", subscriptionName = "consumer-pause-subscription",
topics = "consumer-pause-topic", properties = { "receiverQueueSize=1" })
void listen(String msg) {
latch.countDown();
}
}
}
}