Commit 1480f071 authored by Stephane Nicoll's avatar Stephane Nicoll

Polish "Add Kafka Kerberos Configuration Properties"

Closes gh-9151
parent c4cfc4dd
...@@ -40,6 +40,7 @@ import org.springframework.kafka.support.ProducerListener; ...@@ -40,6 +40,7 @@ import org.springframework.kafka.support.ProducerListener;
* {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka. * {@link EnableAutoConfiguration Auto-configuration} for Apache Kafka.
* *
* @author Gary Russell * @author Gary Russell
* @author Stephane Nicoll
* @since 1.5.0 * @since 1.5.0
*/ */
@Configuration @Configuration
...@@ -88,7 +89,7 @@ public class KafkaAutoConfiguration { ...@@ -88,7 +89,7 @@ public class KafkaAutoConfiguration {
@Bean @Bean
@ConditionalOnProperty(name = "spring.kafka.jaas.enabled") @ConditionalOnProperty(name = "spring.kafka.jaas.enabled")
@ConditionalOnMissingBean(KafkaJaasLoginModuleInitializer.class) @ConditionalOnMissingBean
public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException { public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer(); KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
Jaas jaasProperties = this.properties.getJaas(); Jaas jaasProperties = this.properties.getJaas();
......
...@@ -793,16 +793,16 @@ public class KafkaProperties { ...@@ -793,16 +793,16 @@ public class KafkaProperties {
/** /**
* Login module. * Login module.
*/ */
private String loginModule; private String loginModule = "com.sun.security.auth.module.Krb5LoginModule";
/** /**
* AppConfigurationEntry.LoginModuleControlFlag value. * Control flag for login configuration.
*/ */
private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag = private KafkaJaasLoginModuleInitializer.ControlFlag controlFlag =
KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED; KafkaJaasLoginModuleInitializer.ControlFlag.REQUIRED;
/** /**
* Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. * Additional JAAS options.
*/ */
private final Map<String, String> options = new HashMap<>(); private final Map<String, String> options = new HashMap<>();
......
...@@ -325,6 +325,10 @@ ...@@ -325,6 +325,10 @@
"description": "Log a warning for transactions executed without a single enlisted resource.", "description": "Log a warning for transactions executed without a single enlisted resource.",
"defaultValue": true "defaultValue": true
}, },
{
"name": "spring.kafka.jaas.control-flag",
"defaultValue": "required"
},
{ {
"name": "spring.mobile.devicedelegatingviewresolver.enabled", "name": "spring.mobile.devicedelegatingviewresolver.enabled",
"type": "java.lang.Boolean", "type": "java.lang.Boolean",
......
/* /*
* Copyright 2012-2016 the original author or authors. * Copyright 2012-2017 the original author or authors.
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
...@@ -43,6 +43,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo ...@@ -43,6 +43,7 @@ import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMo
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
/** /**
* Tests for {@link KafkaAutoConfiguration}. * Tests for {@link KafkaAutoConfiguration}.
...@@ -163,7 +164,8 @@ public class KafkaAutoConfigurationTests { ...@@ -163,7 +164,8 @@ public class KafkaAutoConfigurationTests {
assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2); assertThat(configs.get(ProducerConfig.RETRIES_CONFIG)).isEqualTo(2);
assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerSerializer.class); .isEqualTo(IntegerSerializer.class);
assertThat(this.context.containsBean("kafkaJaasInitializer")).isFalse(); assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
} }
@Test @Test
...@@ -197,13 +199,16 @@ public class KafkaAutoConfigurationTests { ...@@ -197,13 +199,16 @@ public class KafkaAutoConfigurationTests {
assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3); assertThat(dfa.getPropertyValue("concurrency")).isEqualTo(3);
assertThat(dfa.getPropertyValue("containerProperties.pollTimeout")) assertThat(dfa.getPropertyValue("containerProperties.pollTimeout"))
.isEqualTo(2000L); .isEqualTo(2000L);
assertThat(this.context.containsBean("kafkaJaasInitializer")).isTrue(); assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
KafkaJaasLoginModuleInitializer jaas = this.context.getBean(KafkaJaasLoginModuleInitializer.class); .hasSize(1);
KafkaJaasLoginModuleInitializer jaas = this.context.getBean(
KafkaJaasLoginModuleInitializer.class);
dfa = new DirectFieldAccessor(jaas); dfa = new DirectFieldAccessor(jaas);
assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo"); assertThat(dfa.getPropertyValue("loginModule")).isEqualTo("foo");
assertThat(dfa.getPropertyValue("controlFlag")) assertThat(dfa.getPropertyValue("controlFlag"))
.isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE); .isEqualTo(AppConfigurationEntry.LoginModuleControlFlag.REQUISITE);
assertThat(((Map<?, ?>) dfa.getPropertyValue("options")).get("useKeyTab")).isEqualTo("true"); assertThat(((Map<String, String>) dfa.getPropertyValue("options")))
.containsExactly(entry("useKeyTab", "true"));
} }
private void load(String... environment) { private void load(String... environment) {
......
...@@ -942,10 +942,10 @@ content into your application; rather pick only the properties that you need. ...@@ -942,10 +942,10 @@ content into your application; rather pick only the properties that you need.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.value-deserializer= # Deserializer class for values. spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.jaas.control-flag=REQUIRED # AppConfigurationEntry.LoginModuleControlFlag value. spring.kafka.jaas.control-flag=required # Control flag for login configuration.
spring.kafka.jaas.enabled= # Enable JAAS configuration. spring.kafka.jaas.enabled= # Enable JAAS configuration.
spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module. spring.kafka.jaas.login-module=com.sun.security.auth.module.Krb5LoginModule # Login module.
spring.kafka.jaas.options= # Map of JAAS options, e.g. 'spring.kafka.jaas.options.useKeyTab=true'. spring.kafka.jaas.options= # Additional JAAS options.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME". spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation. spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME". spring.kafka.listener.ack-time= # Time in milliseconds between offset commits when ackMode is "TIME" or "COUNT_TIME".
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment