Commit 6d396b97 authored by nklmish's avatar nklmish Committed by Phillip Webb

Add Kafka transaction support property

Add `spring.kafka.producer.transaction-id-prefix` property that will be
passed to `DefaultKafkaProducerFactory.setTransactionIdPrefix(...)`

See gh-11076
parent 8d8357e5
...@@ -38,6 +38,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; ...@@ -38,6 +38,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
/** /**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
...@@ -45,6 +46,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; ...@@ -45,6 +46,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter;
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Nakul Mishra
* @since 1.5.0 * @since 1.5.0
*/ */
@Configuration @Configuration
...@@ -94,8 +96,20 @@ public class KafkaAutoConfiguration { ...@@ -94,8 +96,20 @@ public class KafkaAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean(ProducerFactory.class) @ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() { public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<>( DefaultKafkaProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties()); this.properties.buildProducerProperties());
KafkaProperties.Producer producer = this.properties.getProducer();
if (producer.getTransactionIdPrefix() != null) {
factory.setTransactionIdPrefix(producer.getTransactionIdPrefix());
}
return factory;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
} }
@Bean @Bean
......
...@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils; ...@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Artem Bilan * @author Artem Bilan
* @author Nakul Mishra
* @since 1.5.0 * @since 1.5.0
*/ */
@ConfigurationProperties(prefix = "spring.kafka") @ConfigurationProperties(prefix = "spring.kafka")
...@@ -519,6 +520,11 @@ public class KafkaProperties { ...@@ -519,6 +520,11 @@ public class KafkaProperties {
*/ */
private Integer retries; private Integer retries;
/**
* When non empty, enables transactional support for producer.
*/
private String transactionIdPrefix;
/** /**
* Additional producer-specific properties used to configure the client. * Additional producer-specific properties used to configure the client.
*/ */
...@@ -600,6 +606,14 @@ public class KafkaProperties { ...@@ -600,6 +606,14 @@ public class KafkaProperties {
this.retries = retries; this.retries = retries;
} }
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public Map<String, String> getProperties() { public Map<String, String> getProperties() {
return this.properties; return this.properties;
} }
......
...@@ -48,6 +48,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; ...@@ -48,6 +48,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry; import static org.assertj.core.api.Assertions.entry;
...@@ -59,6 +60,7 @@ import static org.mockito.Mockito.mock; ...@@ -59,6 +60,7 @@ import static org.mockito.Mockito.mock;
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll * @author Stephane Nicoll
* @author Eddú Meléndez * @author Eddú Meléndez
* @author Nakul Mishra
*/ */
public class KafkaAutoConfigurationTests { public class KafkaAutoConfigurationTests {
...@@ -198,6 +200,9 @@ public class KafkaAutoConfigurationTests { ...@@ -198,6 +200,9 @@ public class KafkaAutoConfigurationTests {
assertThat( assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty(); .isEmpty();
assertThat(
context.getBeansOfType(KafkaTransactionManager.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
}); });
...@@ -256,6 +261,7 @@ public class KafkaAutoConfigurationTests { ...@@ -256,6 +261,7 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.poll-timeout=2000", "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.type=batch", "spring.kafka.listener.type=batch",
"spring.kafka.jaas.enabled=true", "spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo", "spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE", "spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true") "spring.kafka.jaas.options.useKeyTab=true")
...@@ -297,6 +303,9 @@ public class KafkaAutoConfigurationTests { ...@@ -297,6 +303,9 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo( assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo(
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(
context.getBeansOfType(KafkaTransactionManager.class))
.hasSize(1);
assertThat(((Map<String, String>) dfa.getPropertyValue("options"))) assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true")); .containsExactly(entry("useKeyTab", "true"));
}); });
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment