Commit b7ae5586 authored by Stephane Nicoll's avatar Stephane Nicoll

Polish "Improve Kafka Auto-configuration"

Closes gh-14215
parent 59c6dc5c
...@@ -78,7 +78,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -78,7 +78,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* Set the {@link KafkaAwareTransactionManager} to use. * Set the {@link KafkaAwareTransactionManager} to use.
* @param transactionManager the transaction manager * @param transactionManager the transaction manager
*/ */
public void setTransactionManager( void setTransactionManager(
KafkaAwareTransactionManager<Object, Object> transactionManager) { KafkaAwareTransactionManager<Object, Object> transactionManager) {
this.transactionManager = transactionManager; this.transactionManager = transactionManager;
} }
...@@ -87,7 +87,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -87,7 +87,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* Set the {@link ErrorHandler} to use. * Set the {@link ErrorHandler} to use.
* @param errorHandler the error handler * @param errorHandler the error handler
*/ */
public void setErrorHandler(ErrorHandler errorHandler) { void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler; this.errorHandler = errorHandler;
} }
...@@ -95,7 +95,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { ...@@ -95,7 +95,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
* Set the {@link AfterRollbackProcessor} to use. * Set the {@link AfterRollbackProcessor} to use.
* @param afterRollbackProcessor the after rollback processor * @param afterRollbackProcessor the after rollback processor
*/ */
public void setAfterRollbackProcessor( void setAfterRollbackProcessor(
AfterRollbackProcessor<Object, Object> afterRollbackProcessor) { AfterRollbackProcessor<Object, Object> afterRollbackProcessor) {
this.afterRollbackProcessor = afterRollbackProcessor; this.afterRollbackProcessor = afterRollbackProcessor;
} }
......
/* /*
* Copyright 2012-2017 the original author or authors. * Copyright 2012-2018 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -75,8 +75,8 @@ class KafkaAnnotationDrivenConfiguration { ...@@ -75,8 +75,8 @@ class KafkaAnnotationDrivenConfiguration {
configurer.setKafkaProperties(this.properties); configurer.setKafkaProperties(this.properties);
configurer.setMessageConverter(this.messageConverter); configurer.setMessageConverter(this.messageConverter);
configurer.setReplyTemplate(this.kafkaTemplate); configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setErrorHandler(this.errorHandler);
configurer.setTransactionManager(this.transactionManager); configurer.setTransactionManager(this.transactionManager);
configurer.setErrorHandler(this.errorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor); configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
return configurer; return configurer;
} }
......
...@@ -59,6 +59,7 @@ import org.springframework.kafka.support.converter.MessagingMessageConverter; ...@@ -59,6 +59,7 @@ import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager; import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager; import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.transaction.PlatformTransactionManager; import org.springframework.transaction.PlatformTransactionManager;
...@@ -81,31 +82,29 @@ public class KafkaAutoConfigurationTests { ...@@ -81,31 +82,29 @@ public class KafkaAutoConfigurationTests {
@Test @Test
public void consumerProperties() { public void consumerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234",
.withPropertyValues("spring.kafka.bootstrap-servers=foo:1234", "spring.kafka.properties.foo=bar", "spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo=bar", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.properties.baz=qux", "spring.kafka.ssl.key-password=p1",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.ssl.key-store-location=classpath:ksLoc",
"spring.kafka.ssl.key-password=p1", "spring.kafka.ssl.key-store-password=p2",
"spring.kafka.ssl.key-store-location=classpath:ksLoc", "spring.kafka.ssl.key-store-type=PKCS12",
"spring.kafka.ssl.key-store-password=p2", "spring.kafka.ssl.trust-store-location=classpath:tsLoc",
"spring.kafka.ssl.key-store-type=PKCS12", "spring.kafka.ssl.trust-store-password=p3",
"spring.kafka.ssl.trust-store-location=classpath:tsLoc", "spring.kafka.ssl.trust-store-type=PKCS12",
"spring.kafka.ssl.trust-store-password=p3", "spring.kafka.ssl.protocol=TLSv1.2",
"spring.kafka.ssl.trust-store-type=PKCS12", "spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.ssl.protocol=TLSv1.2", "spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-commit-interval=123", "spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.max-poll-records=42", "spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.enable-auto-commit=false",
"spring.kafka.consumer.client-id=ccid", // test override common "spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.properties.fiz.buz=fix.fox",
"spring.kafka.consumer.fetch-max-wait=456", "spring.kafka.consumer.fetch-min-size=789",
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.group-id=bar",
"spring.kafka.consumer.fetch-min-size=789", "spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
.run((context) -> { .run((context) -> {
DefaultKafkaConsumerFactory<?, ?> consumerFactory = context DefaultKafkaConsumerFactory<?, ?> consumerFactory = context
.getBean(DefaultKafkaConsumerFactory.class); .getBean(DefaultKafkaConsumerFactory.class);
...@@ -160,36 +159,30 @@ public class KafkaAutoConfigurationTests { ...@@ -160,36 +159,30 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("errorHandler"));
}); });
} }
@Test @Test
public void producerProperties() { public void producerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
.withPropertyValues("spring.kafka.clientId=cid", "spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz", "spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20",
"spring.kafka.producer.acks=all", "spring.kafka.producer.bootstrap-servers=bar:1234", // test
"spring.kafka.producer.batch-size=20", // override
"spring.kafka.producer.bootstrap-servers=bar:1234", // test "spring.kafka.producer.buffer-memory=12345",
// override "spring.kafka.producer.compression-type=gzip",
"spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer",
"spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.retries=2",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.properties.fiz.buz=fix.fox",
"spring.kafka.producer.retries=2", "spring.kafka.producer.ssl.key-password=p4",
"spring.kafka.producer.properties.fiz.buz=fix.fox", "spring.kafka.producer.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.producer.ssl.key-password=p4", "spring.kafka.producer.ssl.key-store-password=p5",
"spring.kafka.producer.ssl.key-store-location=classpath:ksLocP", "spring.kafka.producer.ssl.key-store-type=PKCS12",
"spring.kafka.producer.ssl.key-store-password=p5", "spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.producer.ssl.key-store-type=PKCS12", "spring.kafka.producer.ssl.trust-store-password=p6",
"spring.kafka.producer.ssl.trust-store-location=classpath:tsLocP", "spring.kafka.producer.ssl.trust-store-type=PKCS12",
"spring.kafka.producer.ssl.trust-store-password=p6", "spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.ssl.trust-store-type=PKCS12", "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
"spring.kafka.producer.ssl.protocol=TLSv1.2",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer")
.run((context) -> { .run((context) -> {
DefaultKafkaProducerFactory<?, ?> producerFactory = context DefaultKafkaProducerFactory<?, ?> producerFactory = context
.getBean(DefaultKafkaProducerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
...@@ -414,7 +407,7 @@ public class KafkaAutoConfigurationTests { ...@@ -414,7 +407,7 @@ public class KafkaAutoConfigurationTests {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void listenerProperties() { public void listenerProperties() {
this.contextRunner.withUserConfiguration(TestConfiguration.class) this.contextRunner
.withPropertyValues("spring.kafka.template.default-topic=testTopic", .withPropertyValues("spring.kafka.template.default-topic=testTopic",
"spring.kafka.listener.ack-mode=MANUAL", "spring.kafka.listener.ack-mode=MANUAL",
"spring.kafka.listener.client-id=client", "spring.kafka.listener.client-id=client",
...@@ -506,7 +499,6 @@ public class KafkaAutoConfigurationTests { ...@@ -506,7 +499,6 @@ public class KafkaAutoConfigurationTests {
@Test @Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() { public void testConcurrentKafkaListenerContainerFactoryWithCustomMessageConverters() {
this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class) this.contextRunner.withUserConfiguration(MessageConverterConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> { .run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class); .getBean(ConcurrentKafkaListenerContainerFactory.class);
...@@ -514,11 +506,56 @@ public class KafkaAutoConfigurationTests { ...@@ -514,11 +506,56 @@ public class KafkaAutoConfigurationTests {
kafkaListenerContainerFactory); kafkaListenerContainerFactory);
assertThat(dfa.getPropertyValue("messageConverter")) assertThat(dfa.getPropertyValue("messageConverter"))
.isSameAs(context.getBean("myMessageConverter")); .isSameAs(context.getBean("myMessageConverter"));
assertThat(kafkaListenerContainerFactory.getContainerProperties() });
.getTransactionManager()).isSameAs( }
context.getBean("chainedTransactionManager"));
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomErrorHandler() {
this.contextRunner.withUserConfiguration(ErrorHandlerConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(KafkaTestUtils.getPropertyValue(factory, "errorHandler"))
.isSameAs(context.getBean("errorHandler"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
this.contextRunner
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
assertThat(context).hasSingleBean(KafkaAwareTransactionManager.class);
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getTransactionManager())
.isSameAs(
context.getBean(KafkaAwareTransactionManager.class));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomTransactionManager() {
this.contextRunner.withUserConfiguration(TransactionManagerConfiguration.class)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test")
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getTransactionManager())
.isSameAs(context.getBean("chainedTransactionManager"));
});
}
@Test
public void testConcurrentKafkaListenerContainerFactoryWithCustomAfterRollbackProcessor() {
this.contextRunner
.withUserConfiguration(AfterRollbackProcessorConfiguration.class)
.run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
DirectFieldAccessor dfa = new DirectFieldAccessor(factory);
assertThat(dfa.getPropertyValue("afterRollbackProcessor")) assertThat(dfa.getPropertyValue("afterRollbackProcessor"))
.isSameAs(context.getBean("arp")); .isSameAs(context.getBean("afterRollbackProcessor"));
}); });
} }
...@@ -535,34 +572,43 @@ public class KafkaAutoConfigurationTests { ...@@ -535,34 +572,43 @@ public class KafkaAutoConfigurationTests {
} }
@Configuration @Configuration
protected static class TestConfiguration { protected static class MessageConverterConfiguration {
@Bean @Bean
public SeekToCurrentErrorHandler errorHandler() { public RecordMessageConverter myMessageConverter() {
return new SeekToCurrentErrorHandler(); return mock(RecordMessageConverter.class);
} }
} }
@Configuration @Configuration
protected static class MessageConverterConfiguration { protected static class ErrorHandlerConfiguration {
@Bean @Bean
public RecordMessageConverter myMessageConverter() { public SeekToCurrentErrorHandler errorHandler() {
return mock(RecordMessageConverter.class); return new SeekToCurrentErrorHandler();
} }
}
@Configuration
protected static class TransactionManagerConfiguration {
@Bean @Bean
@Primary @Primary
public PlatformTransactionManager chainedTransactionManager( public PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager<String, String> kafkaTransactionManager) { KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>( return new ChainedKafkaTransactionManager<String, String>(
kafkaTransactionManager); kafkaTransactionManager);
} }
}
@Configuration
protected static class AfterRollbackProcessorConfiguration {
@Bean @Bean
public AfterRollbackProcessor<Object, Object> arp() { public AfterRollbackProcessor<Object, Object> afterRollbackProcessor() {
return (records, consumer, ex, recoverable) -> { return (records, consumer, ex, recoverable) -> {
// no-op // no-op
}; };
......
...@@ -5628,18 +5628,6 @@ When the Apache Kafka infrastructure is present, any bean can be annotated with ...@@ -5628,18 +5628,6 @@ When the Apache Kafka infrastructure is present, any bean can be annotated with
`@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has `@KafkaListener` to create a listener endpoint. If no `KafkaListenerContainerFactory` has
been defined, a default one is automatically configured with keys defined in been defined, a default one is automatically configured with keys defined in
`spring.kafka.listener.*`. `spring.kafka.listener.*`.
If the property `spring.kafka.producer.transaction-id-prefix` is defined, a
`KafkaTransactionManager` will be auto-configured with name `kafkaTransactionManager` and
will be wired into the container factory.
Also, if `RecordMessageConverter`, `ErrorHandler` and/or
`AfterRollbackProcessor` beans are defined, they are automatically associated to the
default factory.
IMPORTANT: The auto configuration of these beans occur if there is just a single
instance.
When using a `ChainedKafkaTransactionManager`, it will usually reference the configured
`KafkaTransactionManager` bean, so the chained manager must be marked
`@Primary` if you want it wired into the container factory.
The following component creates a listener endpoint on the `someTopic` topic: The following component creates a listener endpoint on the `someTopic` topic:
...@@ -5656,6 +5644,16 @@ The following component creates a listener endpoint on the `someTopic` topic: ...@@ -5656,6 +5644,16 @@ The following component creates a listener endpoint on the `someTopic` topic:
} }
---- ----
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
container factory. Similarly, if a `RecordMessageConverter`, `ErrorHandler` or
`AfterRollbackProcessor` bean is defined, it is automatically associated to the default
factory.
TIP: A custom `ChainedKafkaTransactionManager` must be marked `@Primary` as it usually
reference the auto-configured `KafkaTransactionManager` bean.
[[boot-features-kafka-streams]] [[boot-features-kafka-streams]]
==== Kafka Streams ==== Kafka Streams
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment