Commit c4cfc4dd authored by Gary Russell's avatar Gary Russell Committed by Stephane Nicoll

Add Kafka Kerberos Configuration Properties

See gh-9151
parent f79b4e0d
...@@ -16,9 +16,13 @@ ...@@ -16,9 +16,13 @@
package org.springframework.boot.autoconfigure.kafka; package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
...@@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; ...@@ -28,6 +32,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener; import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener; import org.springframework.kafka.support.ProducerListener;
...@@ -81,4 +86,20 @@ public class KafkaAutoConfiguration { ...@@ -81,4 +86,20 @@ public class KafkaAutoConfiguration {
this.properties.buildProducerProperties()); this.properties.buildProducerProperties());
} }
@Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class)
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
Jaas jaasProperties = this.properties.getJaas();
if (jaasProperties.getControlFlag() != null) {
jaas.setControlFlag(jaasProperties.getControlFlag());
}
if (jaasProperties.getLoginModule() != null) {
jaas.setLoginModule(jaasProperties.getLoginModule());
}
jaas.setOptions(jaasProperties.getOptions());
return jaas;
}
} }
...@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer; ...@@ -33,6 +33,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
/** /**
...@@ -74,6 +75,8 @@ public class KafkaProperties { ...@@ -74,6 +75,8 @@ public class KafkaProperties {
private final Ssl ssl = new Ssl(); private final Ssl ssl = new Ssl();
private final Jaas jaas = new Jaas();
private final Template template = new Template(); private final Template template = new Template();
public List<String> getBootstrapServers() { public List<String> getBootstrapServers() {
...@@ -116,6 +119,10 @@ public class KafkaProperties { ...@@ -116,6 +119,10 @@ public class KafkaProperties {
return this.ssl; return this.ssl;
} }
public Jaas getJaas() {
return this.jaas;
}
public Template getTemplate() { public Template getTemplate() {
return this.template; return this.template;
} }
...@@ -776,4 +783,63 @@ public class KafkaProperties { ...@@ -776,4 +783,63 @@ public class KafkaProperties {
} }
public static class Jaas {
/**
* Enable JAAS configuration.
*/
private boolean enabled;
/**
* Login module.
*/
private String loginModule;
/**
* AppConfigurationEntry.LoginModuleControlFlag value.
*/
private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag =
KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
/**
* Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'.
*/
private final Map<String, String> options = new HashMap<>();
public boolean isEnabled() {
return this.enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public String getLoginModule() {
return this.loginModule;
}
public void setLoginModule(String loginModule) {
this.loginModule = loginModule;
}
public KafkaJaasLoginModuleInitializer.ControlFlag getControlFlag() {
return this.controlFlag;
}
public void setControlFlag(KafkaJaasLoginModuleInitializer.ControlFlag controlFlag) {
this.controlFlag = controlFlag;
}
public Map<String, String> getOptions() {
return this.options;
}
public void setOptions(Map<String, String> options) {
if (options != null) {
this.options.putAll(options);
}
}
}
} }
...@@ -20,6 +20,8 @@ import java.io.File; ...@@ -20,6 +20,8 @@ import java.io.File;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
...@@ -38,6 +40,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory; ...@@ -38,6 +40,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode; import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -160,6 +163,7 @@ public class KafkaAutoConfigurationTests { ...@@ -160,6 +163,7 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class); .isEqualTo(IntegerSerializer.class);
assertThat(this.context.containsBean("kafkaJaasInitializer")).isFalse();
} }
@Test @Test
...@@ -169,7 +173,11 @@ public class KafkaAutoConfigurationTests { ...@@ -169,7 +173,11 @@ public class KafkaAutoConfigurationTests {
"spring.kafka.listener.ack-count=123", "spring.kafka.listener.ack-count=123",
"spring.kafka.listener.ack-time=456", "spring.kafka.listener.ack-time=456",
"spring.kafka.listener.concurrency=3", "spring.kafka.listener.concurrency=3",
"spring.kafka.listener.poll-timeout=2000"); "spring.kafka.listener.poll-timeout=2000",
"spring.kafka.jaas.enabled=true",
"spring.kafka.jaas.login-module=foo",
"spring.kafka.jaas.control-flag=REQUISITE",
"spring.kafka.jaas.options.useKeyTab=true");
DefaultKafkaProducerFactory<?, ?> producerFactory = this.context DefaultKafkaProducerFactory<?, ?> producerFactory = this.context
.getBean(DefaultKafkaProducerFactory.class); .getBean(DefaultKafkaProducerFactory.class);
DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context DefaultKafkaConsumerFactory<?, ?> consumerFactory = this.context
...@@ -189,6 +197,13 @@ public class KafkaAutoConfigurationTests { ...@@ -189,6 +197,13 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L); .isEqualTo(2000L);
assertThat(this.context.containsBean("kafkaJaasInitializer")).isTrue();
KafkaJaasLoginModuleInitializer jaas = this.context.getBean(KafkaJaasLoginModuleInitializer.class);
dfa = new DirectFieldAccessor(jaas);
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag"))
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(((Map<?, ?>) dfa.getPropertyValue("options")).get("useKeyTab")).isEqualTo("true");
} }
private void load(String... environment) { private void load(String... environment) {
......
...@@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need. ...@@ -942,6 +942,10 @@ content into your application; rather pick only the properties that you need.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=REQUIRED # AppConfigurationEntry.LoginModuleControlFlag value.
spring.kafka.jaas.enabled= # Enable JAAS configuration.
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
spring.kafka.jaas.options= # Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment