Commit bb7940f8 authored by Madhura Bhave's avatar Madhura Bhave

Use BatchErrorHandler when Kafka listener type is batch

Closes gh-16499
parent ac2b0093
...@@ -24,6 +24,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; ...@@ -24,6 +24,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.MessageConverter; import org.springframework.kafka.support.converter.MessageConverter;
...@@ -48,6 +49,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -48,6 +49,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
private ErrorHandler errorHandler; private ErrorHandler errorHandler;
private BatchErrorHandler batchErrorHandler;
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor; private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
/** /**
...@@ -91,6 +94,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -91,6 +94,14 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
} }
/**
* Set the {@link BatchErrorHandler} to use.
* @param batchErrorHandler the error handler
*/
public void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
this.batchErrorHandler = batchErrorHandler;
}
/** /**
* Set the {@link AfterRollbackProcessor} to use. * Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor * @param afterRollbackProcessor the after rollback processor
...@@ -124,8 +135,13 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -124,8 +135,13 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
map.from(this.replyTemplate).to(factory::setReplyTemplate); map.from(this.replyTemplate).to(factory::setReplyTemplate);
map.from(properties::getType).whenEqualTo(Listener.Type.BATCH) map.from(properties::getType).whenEqualTo(Listener.Type.BATCH)
.toCall(() -> factory.setBatchListener(true)); .toCall(() -> factory.setBatchListener(true));
map.from(this.errorHandler).to(factory::setErrorHandler);
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor); map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
if (properties.getType().equals(Listener.Type.BATCH)) {
factory.setBatchErrorHandler(this.batchErrorHandler);
}
else {
factory.setErrorHandler(this.errorHandler);
}
} }
private void configureContainer(ContainerProperties container) { private void configureContainer(ContainerProperties container) {
......
...@@ -28,6 +28,7 @@ import org.springframework.kafka.config.KafkaListenerConfigUtils; ...@@ -28,6 +28,7 @@ import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ErrorHandler; import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter; import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
...@@ -58,6 +59,8 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -58,6 +59,8 @@ class KafkaAnnotationDrivenConfiguration {
private final ErrorHandler errorHandler; private final ErrorHandler errorHandler;
private final BatchErrorHandler batchErrorHandler;
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor; private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
KafkaAnnotationDrivenConfiguration(KafkaProperties properties, KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
...@@ -66,6 +69,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -66,6 +69,7 @@ class KafkaAnnotationDrivenConfiguration {
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate, ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager, ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
ObjectProvider<ErrorHandler> errorHandler, ObjectProvider<ErrorHandler> errorHandler,
ObjectProvider<BatchErrorHandler> batchErrorHandler,
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) { ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
this.properties = properties; this.properties = properties;
this.messageConverter = messageConverter.getIfUnique(); this.messageConverter = messageConverter.getIfUnique();
...@@ -74,6 +78,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -74,6 +78,7 @@ class KafkaAnnotationDrivenConfiguration {
this.kafkaTemplate = kafkaTemplate.getIfUnique(); this.kafkaTemplate = kafkaTemplate.getIfUnique();
this.transactionManager = kafkaTransactionManager.getIfUnique(); this.transactionManager = kafkaTransactionManager.getIfUnique();
this.errorHandler = errorHandler.getIfUnique(); this.errorHandler = errorHandler.getIfUnique();
this.batchErrorHandler = batchErrorHandler.getIfUnique();
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique(); this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
} }
...@@ -88,6 +93,7 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -88,6 +93,7 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setReplyTemplate(this.kafkaTemplate); configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager); configurer.setTransactionManager(this.transactionManager);
configurer.setErrorHandler(this.errorHandler); configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
return configurer; return configurer;
} }
......
...@@ -55,6 +55,7 @@ import org.springframework.kafka.core.KafkaTemplate; ...@@ -55,6 +55,7 @@ import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AfterRollbackProcessor; import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler; import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.BatchMessageConverter; import org.springframework.kafka.support.converter.BatchMessageConverter;
...@@ -581,6 +582,39 @@ public class KafkaAutoConfigurationTests { ...@@ -581,6 +582,39 @@ public class KafkaAutoConfigurationTests {
}); });
} }
@Test
public void concurrentKafkaListenerContainerFactoryInBatchModeShouldUseBatchErrorHandler() {
this.contextRunner.withUserConfiguration(BatchErrorHandlerConfiguration.class)
.withPropertyValues("spring.kafka.listener.type=batch").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("batchErrorHandler"));
});
}
@Test
public void concurrentKafkaListenerContainerFactoryInBatchModeWhenBatchErrorHandlerNotAvailableShouldBeNull() {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isNull();
});
}
@Test
public void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShouldBeNull() {
this.contextRunner.withPropertyValues("spring.kafka.listener.type=batch")
.withUserConfiguration(ErrorHandlerConfiguration.class).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isNull();
});
}
@Test @Test
public void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() { public void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
this.contextRunner this.contextRunner
...@@ -660,6 +694,16 @@ public class KafkaAutoConfigurationTests { ...@@ -660,6 +694,16 @@ public class KafkaAutoConfigurationTests {
} }
@Configuration(proxyBeanMethods = false)
protected static class BatchErrorHandlerConfiguration {
@Bean
public SeekToCurrentBatchErrorHandler batchErrorHandler() {
return new SeekToCurrentBatchErrorHandler();
}
}
@Configuration(proxyBeanMethods = false) @Configuration(proxyBeanMethods = false)
protected static class TransactionManagerConfiguration { protected static class TransactionManagerConfiguration {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment