Commit 3be667cf authored by Phillip Webb's avatar Phillip Webb

Merge pull request #11076 from nklmish/issue-11067

* pr/11076:
  Polish Kafka transaction support property
  Add Kafka transaction support property
parents 8d8357e5 4c29c35c
......@@ -38,6 +38,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.KafkaTransactionManager;
/**
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
......@@ -45,6 +46,7 @@ import org.springframework.kafka.support.converter.RecordMessageConverter;
* @author Gary Russell
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
* @since 1.5.0
*/
@Configuration
......@@ -94,8 +96,22 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public ProducerFactory<?, ?> kafkaProducerFactory() {
return new DefaultKafkaProducerFactory<>(
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(
this.properties.buildProducerProperties());
String transactionIdPrefix = this.properties.getProducer()
.getTransactionIdPrefix();
if (transactionIdPrefix != null) {
factory.setTransactionIdPrefix(transactionIdPrefix);
}
return factory;
}
@Bean
@ConditionalOnProperty(name = "spring.kafka.producer.transaction-id-prefix")
@ConditionalOnMissingBean
public KafkaTransactionManager<?, ?> kafkaTransactionManager(
ProducerFactory<?, ?> producerFactory) {
return new KafkaTransactionManager<>(producerFactory);
}
@Bean
......
......@@ -45,6 +45,7 @@ import org.springframework.util.CollectionUtils;
* @author Gary Russell
* @author Stephane Nicoll
* @author Artem Bilan
* @author Nakul Mishra
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
......@@ -519,6 +520,11 @@ public class KafkaProperties {
*/
private Integer retries;
/**
* When non empty, enables transaction support for producer.
*/
private String transactionIdPrefix;
/**
* Additional producer-specific properties used to configure the client.
*/
......@@ -600,6 +606,14 @@ public class KafkaProperties {
this.retries = retries;
}
public String getTransactionIdPrefix() {
return this.transactionIdPrefix;
}
public void setTransactionIdPrefix(String transactionIdPrefix) {
this.transactionIdPrefix = transactionIdPrefix;
}
public Map<String, String> getProperties() {
return this.properties;
}
......
......@@ -48,6 +48,7 @@ import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
......@@ -59,6 +60,7 @@ import static org.mockito.Mockito.mock;
* @author Gary Russell
* @author Stephane Nicoll
* @author Eddú Meléndez
* @author Nakul Mishra
*/
public class KafkaAutoConfigurationTests {
......@@ -198,6 +200,8 @@ public class KafkaAutoConfigurationTests {
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(context.getBeansOfType(KafkaTransactionManager.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
});
......@@ -256,6 +260,7 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.poll-timeout=2000",
"spring.kafka.listener.type=batch",
"spring.kafka.jaas.enabled=true",
"spring.kafka.producer.transaction-id-prefix=foo",
"spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true")
......@@ -297,6 +302,8 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag")).isEqualTo(
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(context.getBeansOfType(KafkaTransactionManager.class))
.hasSize(1);
assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true"));
});
......
......@@ -998,6 +998,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.producer.ssl.keystore-password= # Store password for the key store file.
spring.kafka.producer.ssl.truststore-location= # Location of the trust store file.
spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file.
spring.kafka.producer.transaction-id-prefix= # When non empty, enables transaction support for producer.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment