Commit bc90d48f authored by Stephane Nicoll's avatar Stephane Nicoll

Merge pull request #16755 from garyrussell

* pr/16755:
  Polish "Auto-configure Kafka listener container with rebalance listener"
  Auto-configure Kafka listener container with rebalance listener
parents 0635d86c 74208bb1
...@@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory; ...@@ -25,6 +25,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
...@@ -47,6 +48,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -47,6 +48,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private KafkaAwareTransactionManager<Object, Object> transactionManager; private KafkaAwareTransactionManager<Object, Object> transactionManager;
private ConsumerAwareRebalanceListener rebalanceListener;
private ErrorHandler errorHandler; private ErrorHandler errorHandler;
private BatchErrorHandler batchErrorHandler; private BatchErrorHandler batchErrorHandler;
...@@ -86,6 +89,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -86,6 +89,15 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
} }
/**
* Set the {@link ConsumerAwareRebalanceListener} to use.
* @param rebalanceListener the rebalance listener.
* @since 2.2
*/
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
this.rebalanceListener = rebalanceListener;
}
/** /**
* Set the {@link ErrorHandler} to use. * Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler * @param errorHandler the error handler
...@@ -160,6 +172,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -160,6 +172,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig); map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal); map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
map.from(this.transactionManager).to(container::setTransactionManager); map.from(this.transactionManager).to(container::setTransactionManager);
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
} }
} }
...@@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory; ...@@ -29,6 +29,7 @@ import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler; import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
...@@ -57,6 +58,8 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -57,6 +58,8 @@ class KafkaAnnotationDrivenConfiguration {
private final KafkaAwareTransactionManager<Object, Object> transactionManager; private final KafkaAwareTransactionManager<Object, Object> transactionManager;
private final ConsumerAwareRebalanceListener rebalanceListener;
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final BatchErrorHandler batchErrorHandler; private final BatchErrorHandler batchErrorHandler;
...@@ -68,6 +71,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -68,6 +71,7 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<BatchMessageConverter> batchMessageConverter, ObjectProvider<BatchMessageConverter> batchMessageConverter,
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate, ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager, ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
ObjectProvider<ErrorHandler> errorHandler, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) { ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
...@@ -77,6 +81,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -77,6 +81,7 @@ class KafkaAnnotationDrivenConfiguration {
() -> new BatchMessagingMessageConverter(this.messageConverter)); () -> new BatchMessagingMessageConverter(this.messageConverter));
this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique();
this.rebalanceListener = rebalanceListener.getIfUnique();
this.errorHandler = errorHandler.getIfUnique(); this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique(); this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
...@@ -92,6 +97,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -92,6 +97,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setMessageConverter(messageConverterToUse); configurer.setMessageConverter(messageConverterToUse);
configurer.setReplyTemplate(this.kafkaTemplate); configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager); configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setErrorHandler(this.errorHandler); configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler); configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
......
...@@ -55,6 +55,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory; ...@@ -55,6 +55,7 @@ import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin; import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler; import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
...@@ -674,6 +675,18 @@ public class KafkaAutoConfigurationTests { ...@@ -674,6 +675,18 @@ public class KafkaAutoConfigurationTests {
}); });
} }
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties())
.hasFieldOrPropertyWithValue("consumerRebalanceListener",
context.getBean("rebalanceListener"));
});
}
@Test @Test
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() { public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
this.contextRunner.run((context) -> { this.contextRunner.run((context) -> {
...@@ -749,6 +762,16 @@ public class KafkaAutoConfigurationTests { ...@@ -749,6 +762,16 @@ public class KafkaAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
protected static class RebalanceListenerConfiguration {
@Bean
public ConsumerAwareRebalanceListener rebalanceListener() {
return mock(ConsumerAwareRebalanceListener.class);
}
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
@EnableKafkaStreams @EnableKafkaStreams
protected static class EnableKafkaStreamsConfiguration { protected static class EnableKafkaStreamsConfiguration {
......
...@@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic: ...@@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic:
---- ----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is container factory. Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or
defined, it is automatically associated to the default factory. `ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the
default factory.
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean
is associated to the default factory. If only a `RecordMessageConverter` bean is present is associated to the default factory. If only a `RecordMessageConverter` bean is present
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment