Add Kafka shared consumer container support

- New AbstractShareKafkaMessageListenerContainer base class with lifecycle management
- ShareKafkaMessageListenerContainer implementation for share consumer protocol
- Integration tests for end-to-end message delivery validation

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
This commit is contained in:
Soby Chacko
2025-06-06 12:27:22 -04:00
committed by GitHub
parent 045ea9a5c6
commit 5aa4175798
3 changed files with 615 additions and 0 deletions

View File

@@ -0,0 +1,247 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ShareConsumerFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
/**
* Abstract base class for share consumer message listener containers.
* <p>
* Handles common lifecycle, configuration, and event publishing for containers using a
* {@link org.springframework.kafka.core.ShareConsumerFactory}.
* <p>
* Subclasses are responsible for implementing the actual consumer loop and message dispatch logic.
*
* @param <K> the key type
* @param <V> the value type
*
* @author Soby Chacko
* @since 4.0
*/
public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
ApplicationContextAware {
/**
* The default {@link org.springframework.context.SmartLifecycle} phase for listener containers.
*/
public static final int DEFAULT_PHASE = Integer.MAX_VALUE - 100;
protected final ShareConsumerFactory<K, V> shareConsumerFactory;
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
private final ContainerProperties containerProperties;
protected final ReentrantLock lifecycleLock = new ReentrantLock();
private String beanName = "noBeanNameSet";
@Nullable
private ApplicationEventPublisher applicationEventPublisher;
private boolean autoStartup = true;
private int phase = DEFAULT_PHASE;
@Nullable
private ApplicationContext applicationContext;
private volatile boolean running = false;
/**
* Construct an instance with the provided factory and properties.
* @param shareConsumerFactory the factory.
* @param containerProperties the properties.
*/
@SuppressWarnings("unchecked")
protected AbstractShareKafkaMessageListenerContainer(@Nullable ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
ContainerProperties containerProperties) {
Assert.notNull(containerProperties, "'containerProperties' cannot be null");
Assert.notNull(shareConsumerFactory, "'shareConsumerFactory' cannot be null");
this.shareConsumerFactory = (ShareConsumerFactory<K, V>) shareConsumerFactory;
@Nullable String @Nullable [] topics = containerProperties.getTopics();
if (topics != null) {
this.containerProperties = new ContainerProperties(topics);
}
else {
Pattern topicPattern = containerProperties.getTopicPattern();
if (topicPattern != null) {
this.containerProperties = new ContainerProperties(topicPattern);
}
else {
@Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null) {
this.containerProperties = new ContainerProperties(topicPartitions);
}
else {
throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
}
}
}
BeanUtils.copyProperties(containerProperties, this.containerProperties,
"topics", "topicPartitions", "topicPattern");
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Nullable
public ApplicationContext getApplicationContext() {
return this.applicationContext;
}
@Override
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Return the bean name.
* @return the bean name
*/
public String getBeanName() {
return this.beanName;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
/**
* Get the event publisher.
* @return the publisher
*/
@Nullable
public ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}
@Override
public boolean isAutoStartup() {
return this.autoStartup;
}
@Override
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
@Override
public int getPhase() {
return this.phase;
}
public void setPhase(int phase) {
this.phase = phase;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
this.lifecycleLock.lock();
try {
if (!isRunning()) {
Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
doStart();
}
}
finally {
this.lifecycleLock.unlock();
}
}
@Override
public void stop() {
this.lifecycleLock.lock();
try {
if (isRunning()) {
doStop();
}
}
finally {
this.lifecycleLock.unlock();
}
}
@Override
public boolean isRunning() {
return this.running;
}
protected void setRunning(boolean running) {
this.running = running;
}
@Override
public ContainerProperties getContainerProperties() {
return this.containerProperties;
}
@Override
@Nullable
public String getGroupId() {
return this.containerProperties.getGroupId() == null
? (String) this.shareConsumerFactory.getConfigurationProperties().get(ConsumerConfig.GROUP_ID_CONFIG)
: this.containerProperties.getGroupId();
}
@Override
public String getListenerId() {
return this.beanName; // the container factory sets the bean name to the id attribute
}
@Override
public void setupMessageListener(Object messageListener) {
this.containerProperties.setMessageListener(messageListener);
}
protected abstract void doStart();
protected abstract void doStop();
@Override
public void destroy() {
stop();
}
}

View File

@@ -0,0 +1,243 @@
/*
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.jspecify.annotations.Nullable;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ShareConsumerFactory;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.util.Assert;
/**
* {@code ShareKafkaMessageListenerContainer} is a message listener container for Kafka's share consumer model.
* <p>
* This container manages a single-threaded consumer loop using a {@link org.springframework.kafka.core.ShareConsumerFactory}.
* It is designed for use cases where Kafka's cooperative sharing protocol is desired, and provides a simple polling loop
* with per-record dispatch and acknowledgement.
* <p>
* Lifecycle events are published for consumer starting and started. The container supports direct setting of the client.id.
*
* @param <K> the key type
* @param <V> the value type
*
* @author Soby Chacko
* @since 4.0
*/
public class ShareKafkaMessageListenerContainer<K, V>
extends AbstractShareKafkaMessageListenerContainer<K, V> {
private static final int POLL_TIMEOUT = 1000;
@Nullable
private String clientId;
@SuppressWarnings("NullAway.Init")
private volatile ShareListenerConsumer listenerConsumer;
@SuppressWarnings("NullAway.Init")
private volatile CompletableFuture<Void> listenerConsumerFuture;
private volatile CountDownLatch startLatch = new CountDownLatch(1);
/**
* Construct an instance with the supplied configuration properties.
* @param shareConsumerFactory the share consumer factory
* @param containerProperties the container properties
*/
public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory,
ContainerProperties containerProperties) {
super(shareConsumerFactory, containerProperties);
Assert.notNull(shareConsumerFactory, "A ShareConsumerFactory must be provided");
}
/**
* Set the {@code client.id} to use for the consumer.
* @param clientId the client id to set
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Get the {@code client.id} for the consumer.
* @return the client id, or null if not set
*/
@Nullable
public String getClientId() {
return this.clientId;
}
@Override
public boolean isInExpectedState() {
return isRunning();
}
@Override
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
if (listenerConsumerForMetrics != null) {
Map<MetricName, ? extends Metric> metrics = listenerConsumerForMetrics.consumer.metrics();
return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics);
}
return Collections.emptyMap();
}
@Override
protected void doStart() {
if (isRunning()) {
return;
}
ContainerProperties containerProperties = getContainerProperties();
Object messageListener = containerProperties.getMessageListener();
AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
if (consumerExecutor == null) {
consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setListenerTaskExecutor(consumerExecutor);
}
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
Assert.state(listener != null, "'messageListener' cannot be null");
this.listenerConsumer = new ShareListenerConsumer(listener);
setRunning(true);
this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, consumerExecutor);
}
@Override
protected void doStop() {
setRunning(false);
// The consumer will exit its loop naturally when running becomes false.
}
private void publishConsumerStartingEvent() {
this.startLatch.countDown();
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerStartingEvent(this, this));
}
}
private void publishConsumerStartedEvent() {
ApplicationEventPublisher publisher = getApplicationEventPublisher();
if (publisher != null) {
publisher.publishEvent(new ConsumerStartedEvent(this, this));
}
}
/**
* The inner share consumer thread: polls for records and dispatches to the listener.
*/
private class ShareListenerConsumer implements Runnable {
private final LogAccessor logger = ShareKafkaMessageListenerContainer.this.logger;
private final ShareConsumer<K, V> consumer;
private final GenericMessageListener<?> genericListener;
private final @Nullable String consumerGroupId = ShareKafkaMessageListenerContainer.this.getGroupId();
private final @Nullable String clientId;
ShareListenerConsumer(GenericMessageListener<?> listener) {
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
ShareKafkaMessageListenerContainer.this.getGroupId(),
ShareKafkaMessageListenerContainer.this.getClientId());
this.genericListener = listener;
this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
// Subscribe to topics, just like in the test
ContainerProperties containerProperties = getContainerProperties();
this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
}
@Nullable
String getClientId() {
return this.clientId;
}
@Override
public void run() {
initialize();
Throwable exitThrowable = null;
while (isRunning()) {
try {
var records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT));
if (records != null && records.count() > 0) {
for (var record : records) {
@SuppressWarnings("unchecked")
GenericMessageListener<Object> listener = (GenericMessageListener<Object>) this.genericListener;
listener.onMessage(record);
// Temporarily auto-acknowledge and commit.
// We will refactor it later on to support more production-like scenarios.
this.consumer.acknowledge(record, AcknowledgeType.ACCEPT);
this.consumer.commitSync();
}
}
}
catch (Error e) {
this.logger.error(e, "Stopping share consumer due to an Error");
wrapUp();
throw e;
}
catch (Exception e) {
if (e.getCause() instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
this.logger.error(e, "Error in share consumer poll loop");
exitThrowable = e;
break;
}
}
if (exitThrowable != null) {
this.logger.error(exitThrowable, "ShareListenerConsumer exiting due to error");
}
wrapUp();
}
protected void initialize() {
publishConsumerStartingEvent();
publishConsumerStartedEvent();
}
private void wrapUp() {
this.consumer.close();
this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
}
@Override
public String toString() {
return "ShareKafkaMessageListenerContainer.ShareListenerConsumer ["
+ "consumerGroupId=" + this.consumerGroupId
+ ", clientId=" + this.clientId
+ "]";
}
}
}

View File

@@ -0,0 +1,125 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.kafka.listener;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.DefaultShareConsumerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Basic tests for {@link ShareKafkaMessageListenerContainer}.
*
* @author Soby Chacko
* @since 4.0
*/
@EmbeddedKafka(
topics = {"share-listener-integration-test"}, partitions = 1,
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
"share.coordinator.state.topic.replication.factor=1",
"share.coordinator.state.topic.min.isr=1"
}
)
class ShareKafkaMessageListenerContainerIntegrationTests {
@Test
void integrationTestShareKafkaMessageListenerContainer(EmbeddedKafkaBroker broker) throws Exception {
final String topic = "share-listener-integration-test";
final String groupId = "shareListenerGroup";
String bootstrapServers = broker.getBrokersAsString();
// Produce a record
var producerProps = new java.util.Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
try (var producer = new KafkaProducer<String, String>(producerProps)) {
producer.send(new ProducerRecord<>(topic, "key", "integration-test-value")).get();
}
setShareAutoOffsetResetEarliest(bootstrapServers, groupId);
var consumerProps = new java.util.HashMap<String, Object>();
consumerProps.put("bootstrap.servers", bootstrapServers);
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
consumerProps.put("group.id", groupId);
DefaultShareConsumerFactory<String, String> consumerFactory = new DefaultShareConsumerFactory<>(consumerProps);
ContainerProperties containerProps = new ContainerProperties(topic);
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<String> received = new AtomicReference<>();
containerProps.setMessageListener((MessageListener<String, String>) record -> {
received.set(record.value());
latch.countDown();
});
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(consumerFactory, containerProps);
container.setBeanName("integrationTestShareKafkaMessageListenerContainer");
container.start();
try {
assertThat(latch.await(10, java.util.concurrent.TimeUnit.SECONDS)
&& "integration-test-value".equals(received.get()))
.as("Message should be received and have expected value")
.isTrue();
}
finally {
container.stop();
}
}
/**
* Sets the share.auto.offset.reset group config to earliest for the given groupId,
* using the provided bootstrapServers.
*/
private static void setShareAutoOffsetResetEarliest(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProperties = new HashMap<>();
adminProperties.put("bootstrap.servers", bootstrapServers);
ConfigEntry entry = new ConfigEntry("share.auto.offset.reset", "earliest");
AlterConfigOp op = new AlterConfigOp(entry, AlterConfigOp.OpType.SET);
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
new ConfigResource(ConfigResource.Type.GROUP, groupId), List.of(op));
try (Admin admin = AdminClient.create(adminProperties)) {
admin.incrementalAlterConfigs(configs).all().get();
}
}
}