Commit a7acbbd6 authored by Gary Russell's avatar Gary Russell Committed by Stephane Nicoll

Add Kafka Streams auto-configuration

See gh-14021
parent f9207dd9
......@@ -132,6 +132,11 @@
<artifactId>jest</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.flywaydb</groupId>
<artifactId>flyway-core</artifactId>
......
......@@ -17,7 +17,11 @@
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.util.Map;
import org.apache.kafka.streams.StreamsBuilder;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
......@@ -28,12 +32,17 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.StreamsBuilderFactoryBean;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.kafka.support.LoggingProducerListener;
import org.springframework.kafka.support.ProducerListener;
......@@ -138,4 +147,57 @@ public class KafkaAutoConfiguration {
return kafkaAdmin;
}
@Configuration
@ConditionalOnClass(StreamsBuilder.class)
public static class KafkaStreamsAutoConfiguration {
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig(
KafkaProperties properties, Environment environment) {
Map<String, Object> streamsProperties = properties.buildStreamsProperties();
if (properties.getStreams().getApplicationId() == null) {
if (environment.getProperty("spring.application.id") != null) {
streamsProperties.put("application.id",
environment.getProperty("spring.application.name"));
}
}
return new KafkaStreamsConfiguration(streamsProperties);
}
@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
StreamsBuilderFactoryBean factoryBean, KafkaProperties properties) {
return new KafkaStreamsFactoryBeanConfigurer(factoryBean, properties);
}
@Configuration
@EnableKafkaStreams
public static class EnableKafkaStreamsAutoConfiguration {
}
static class KafkaStreamsFactoryBeanConfigurer implements InitializingBean {
private final StreamsBuilderFactoryBean factoryBean;
private final KafkaProperties properties;
KafkaStreamsFactoryBeanConfigurer(StreamsBuilderFactoryBean factoryBean,
KafkaProperties properties) {
this.factoryBean = factoryBean;
this.properties = properties;
}
@Override
public void afterPropertiesSet() throws Exception {
this.factoryBean
.setAutoStartup(this.properties.getStreams().isAutoStartup());
}
}
}
}
......@@ -57,7 +57,7 @@ public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Applies to all components unless overridden.
*/
private List<String> bootstrapServers = new ArrayList<>(
Collections.singletonList("localhost:9092"));
......@@ -79,6 +79,8 @@ public class KafkaProperties {
private final Admin admin = new Admin();
private final Streams streams = new Streams();
private final Listener listener = new Listener();
private final Ssl ssl = new Ssl();
......@@ -123,6 +125,10 @@ public class KafkaProperties {
return this.admin;
}
public Streams getStreams() {
return this.streams;
}
public Ssl getSsl() {
return this.ssl;
}
......@@ -193,6 +199,19 @@ public class KafkaProperties {
return properties;
}
/**
* Create an initial map of streams properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary.
* @return the streams properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildStreamsProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.streams.buildProperties());
return properties;
}
public static class Consumer {
private final Ssl ssl = new Ssl();
......@@ -211,7 +230,7 @@ public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Overrides the global property, for consumers.
*/
private List<String> bootstrapServers;
......@@ -421,7 +440,7 @@ public class KafkaProperties {
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster.
* connection to the Kafka cluster. Overrides the global property, for producers.
*/
private List<String> bootstrapServers;
......@@ -631,6 +650,136 @@ public class KafkaProperties {
}
/**
* High (and some medium) priority Streams properties and a general properties bucket.
*/
public static class Streams {
private final Ssl ssl = new Ssl();
/**
* Kafka streams application.id property; default spring.application.name.
*/
private String applicationId;
/**
* Whether or not to auto-start the streams factory bean.
*/
private boolean autoStartup;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connection to the Kafka cluster. Overrides the global property, for streams.
*/
private List<String> bootstrapServers;
/**
* Maximum number of memory bytes to be used for buffering across all threads.
*/
private Integer cacheMaxBytesBuffering;
/**
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;
/**
* The replication factor for change log topics and repartition topics created by
* the stream processing application.
*/
private Integer replicationFactor;
/**
* Directory location for the state store.
*/
private String stateDir;
/**
* Additional Kafka properties used to configure the streams.
*/
private final Map<String, String> properties = new HashMap<>();
public Ssl getSsl() {
return this.ssl;
}
public String getApplicationId() {
return this.applicationId;
}
public void setApplicationId(String applicationId) {
this.applicationId = applicationId;
}
public boolean isAutoStartup() {
return this.autoStartup;
}
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
public List<String> getBootstrapServers() {
return this.bootstrapServers;
}
public void setBootstrapServers(List<String> bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}
public Integer getCacheMaxBytesBuffering() {
return this.cacheMaxBytesBuffering;
}
public void setCacheMaxBytesBuffering(Integer cacheMaxBytesBuffering) {
this.cacheMaxBytesBuffering = cacheMaxBytesBuffering;
}
public String getClientId() {
return this.clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public Integer getReplicationFactor() {
return this.replicationFactor;
}
public void setReplicationFactor(Integer replicationFactor) {
this.replicationFactor = replicationFactor;
}
public String getStateDir() {
return this.stateDir;
}
public void setStateDir(String stateDir) {
this.stateDir = stateDir;
}
public Map<String, String> getProperties() {
return this.properties;
}
public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getApplicationId).to(properties.in("application.id"));
map.from(this::getBootstrapServers)
.to(properties.in(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
map.from(this::getCacheMaxBytesBuffering)
.to(properties.in("cache.max.bytes.buffering"));
map.from(this::getClientId)
.to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
map.from(this::getStateDir).to(properties.in("state.dir"));
return properties.with(this.ssl, this.properties);
}
}
public static class Template {
/**
......@@ -1011,6 +1160,7 @@ public class KafkaProperties {
}
@SuppressWarnings("serial")
private static class Properties extends HashMap<String, Object> {
public <V> java.util.function.Consumer<V> in(String key) {
......
......@@ -19,6 +19,7 @@ package org.springframework.boot.autoconfigure.kafka;
import java.io.File;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import javax.security.auth.login.AppConfigurationEntry;
......@@ -30,6 +31,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.StreamsConfig;
import org.junit.Test;
import org.springframework.beans.DirectFieldAccessor;
......@@ -37,8 +39,10 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
......@@ -273,6 +277,68 @@ public class KafkaAutoConfigurationTests {
});
}
@Test
public void streamsProperties() {
this.contextRunner.withPropertyValues("spring.kafka.clientId=cid",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.application.name=appName",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.streams.cache-max-bytes-buffering=42",
"spring.kafka.streams.client-id=override",
"spring.kafka.streams.properties.fiz.buz=fix.fox",
"spring.kafka.streams.replication-factor=2",
"spring.kafka.streams.state-dir=/tmp/state",
"spring.kafka.streams.ssl.key-password=p7",
"spring.kafka.streams.ssl.key-store-location=classpath:ksLocP",
"spring.kafka.streams.ssl.key-store-password=p8",
"spring.kafka.streams.ssl.key-store-type=PKCS12",
"spring.kafka.streams.ssl.trust-store-location=classpath:tsLocP",
"spring.kafka.streams.ssl.trust-store-password=p9",
"spring.kafka.streams.ssl.trust-store-type=PKCS12",
"spring.kafka.streams.ssl.protocol=TLSv1.2").run((context) -> {
Properties configs = context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME,
KafkaStreamsConfiguration.class).asProperties();
assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG))
.isEqualTo("localhost:9092, localhost:9093");
assertThat(
configs.get(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG))
.isEqualTo("42");
assertThat(configs.get(StreamsConfig.CLIENT_ID_CONFIG))
.isEqualTo("override");
assertThat(configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG))
.isEqualTo("2");
assertThat(configs.get(StreamsConfig.STATE_DIR_CONFIG))
.isEqualTo("/tmp/state");
assertThat(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG))
.isEqualTo("p7");
assertThat(
(String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "ksLocP");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG))
.isEqualTo("p8");
assertThat(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat((String) configs
.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG))
.endsWith(File.separator + "tsLocP");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG))
.isEqualTo("p9");
assertThat(configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG))
.isEqualTo("PKCS12");
assertThat(configs.get(SslConfigs.SSL_PROTOCOL_CONFIG))
.isEqualTo("TLSv1.2");
assertThat(
context.getBeansOfType(KafkaJaasLoginModuleInitializer.class))
.isEmpty();
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox");
assertThat(context.getBean(
KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME))
.isNotNull();
});
}
@SuppressWarnings("unchecked")
@Test
public void listenerProperties() {
......
......@@ -1039,11 +1039,11 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.admin.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.admin.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.admin.ssl.trust-store-type= # Type of the trust store.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Applies to all components unless overridden.
spring.kafka.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.auto-commit-interval= # Frequency with which the consumer offsets are auto-committed to Kafka if 'enable.auto.commit' is set to true.
spring.kafka.consumer.auto-offset-reset= # What to do when there is no initial offset in Kafka or if the current offset no longer exists on the server.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.consumer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for consumers.
spring.kafka.consumer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.consumer.enable-auto-commit= # Whether the consumer's offset is periodically committed in the background.
spring.kafka.consumer.fetch-max-wait= # Maximum amount of time the server blocks before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by "fetch.min.bytes".
......@@ -1079,7 +1079,7 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.listener.type=single # Listener type.
spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before considering a request complete.
spring.kafka.producer.batch-size= # Default batch size in bytes.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster.
spring.kafka.producer.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for producers.
spring.kafka.producer.buffer-memory= # Total bytes of memory the producer can use to buffer records waiting to be sent to the server.
spring.kafka.producer.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.producer.compression-type= # Compression type for all data generated by the producer.
......@@ -1105,6 +1105,21 @@ content into your application. Rather, pick only the properties that you need.
spring.kafka.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.auto-startup= # Whether or not to auto-start the streams factory bean.
spring.kafka.streams.bootstrap-servers= # Comma-delimited list of host:port pairs to use for establishing the initial connection to the Kafka cluster. Overrides the global property, for streams.
spring.kafka.streams.cache-max-bytes-buffering= # Maximum number of memory bytes to be used for buffering across all threads.
spring.kafka.streams.client-id= # ID to pass to the server when making requests. Used for server-side logging.
spring.kafka.streams.properties.*= # Additional Kafka properties used to configure the streams.
spring.kafka.streams.replication-factor= # The replication factor for change log topics and repartition topics created by the stream processing application.
spring.kafka.streams.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.streams.ssl.key-store-location= # Location of the key store file.
spring.kafka.streams.ssl.key-store-password= # Store password for the key store file.
spring.kafka.streams.ssl.key-store-type= # Type of the key store.
spring.kafka.streams.ssl.protocol= # SSL protocol to use.
spring.kafka.streams.ssl.trust-store-location= # Location of the trust store file.
spring.kafka.streams.ssl.trust-store-password= # Store password for the trust store file.
spring.kafka.streams.ssl.trust-store-type= # Type of the trust store.
spring.kafka.streams.state-dir= # Directory location for the state store.
spring.kafka.template.default-topic= # Default topic to which messages are sent.
# RABBIT ({sc-spring-boot-autoconfigure}/amqp/RabbitProperties.{sc-ext}[RabbitProperties])
......
......@@ -5634,6 +5634,44 @@ The following component creates a listener endpoint on the `someTopic` topic:
}
----
[[boot-deatures-kafka-streams]]
==== Kafka Streams
Spring for Apache Kafka provides a factory bean to create a `StreamsBuilder` object and
manage the lifecycle of its streams; the factory bean is created when
`@EnableKafkaStreams` is present on a `@Configuration` class.
The factory bean requires a `KafkaStreamsConfiguration` object for streams configuration.
If Spring Boot detects the `kafka-streams` jar on the classpath, it will auto-configure
the `KafkaStreamsConfiguration` bean from the `KafkaProperties` object as well as enabling
the creation of the factory bean by spring-kafka.
There are two required Kafka properties for streams (`bootstrap.servers` and
`application.id`); by default, the `application.id` is set to the `spring.application.name`
property, if present.
The `bootstrap.servers` can be set globally or specifically overridden just for streams.
Several other properties are specifically available as boot properties; other arbitrary
Kafka properties can be set using the `spring.kafka.streams.properties` property.
See <<boot-features-kafka-extra-props>> for more information.
To use the factory bean, simply wire its `StreamsBuilder` into your `@Bean` s.
====
[source, java]
----
@Bean
public KStream<Integer, String> kStream(StreamsBuilder streamsBuilder) {
KStream<Integer, String> stream = streamsBuilder.stream("ks1In");
stream.map((k, v) -> new KeyValue(k, v.toUpperCase()))
.to("ks1Out", Produced.with(Serdes.Integer(), new JsonSerde<>()));
return stream;
}
----
====
By default, the factory bean `autoStartup` property is false; to automatically start the
streams managed by the `StreamsBuilder` object it creates, set property
`spting.kafka.streams.auto-startup=true`.
[[boot-features-kafka-extra-props]]
......@@ -5643,13 +5681,14 @@ The properties supported by auto configuration are shown in
(hyphenated or camelCase) map directly to the Apache Kafka dotted properties. Refer to the
Apache Kafka documentation for details.
The first few of these properties apply to both producers and consumers but can be
specified at the producer or consumer level if you wish to use different values for each.
The first few of these properties apply to all components (producers, consumers, admins,
and streams) but can be
specified at the component level if you wish to use different values.
Apache Kafka designates properties with an importance of HIGH, MEDIUM, or LOW. Spring Boot
auto-configuration supports all HIGH importance properties, some selected MEDIUM and LOW
properties, and any properties that do not have a default value.
Only a subset of the properties supported by Kafka are available through the
Only a subset of the properties supported by Kafka are available directly through the
`KafkaProperties` class. If you wish to configure the producer or consumer with additional
properties that are not directly supported, use the following properties:
......@@ -5659,11 +5698,13 @@ properties that are not directly supported, use the following properties:
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth
spring.kafka.streams.properties.prop.five=fifth
----
This sets the common `prop.one` Kafka property to `first` (applies to producers,
consumers and admins), the `prop.two` admin property to `second`, the `prop.three`
consumer property to `third` and the `prop.four` producer property to `fourth`.
consumer property to `third`, the `prop.four` producer property to `fourth` and the
`prop.five` streams property to `fifth`.
You can also configure the Spring Kafka `JsonDeserializer` as follows:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment