Commit b960fa32 authored by Stephane Nicoll's avatar Stephane Nicoll

Polish "Add configuration options for Kafka Stream's CleanupConfig"

See gh-23636
parent f4799c74
...@@ -37,7 +37,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; ...@@ -37,7 +37,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.convert.DurationUnit; import org.springframework.boot.convert.DurationUnit;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -686,6 +685,8 @@ public class KafkaProperties { ...@@ -686,6 +685,8 @@ public class KafkaProperties {
private final Security security = new Security(); private final Security security = new Security();
private final Cleanup cleanup = new Cleanup();
/** /**
* Kafka streams application.id property; default spring.application.name. * Kafka streams application.id property; default spring.application.name.
*/ */
...@@ -723,11 +724,6 @@ public class KafkaProperties { ...@@ -723,11 +724,6 @@ public class KafkaProperties {
*/ */
private String stateDir; private String stateDir;
/**
* Cleanup configuration for the state stores.
*/
private Cleanup cleanup;
/** /**
* Additional Kafka properties used to configure the streams. * Additional Kafka properties used to configure the streams.
*/ */
...@@ -741,6 +737,10 @@ public class KafkaProperties { ...@@ -741,6 +737,10 @@ public class KafkaProperties {
return this.security; return this.security;
} }
public Cleanup getCleanup() {
return this.cleanup;
}
public String getApplicationId() { public String getApplicationId() {
return this.applicationId; return this.applicationId;
} }
...@@ -797,14 +797,6 @@ public class KafkaProperties { ...@@ -797,14 +797,6 @@ public class KafkaProperties {
this.stateDir = stateDir; this.stateDir = stateDir;
} }
public Cleanup getCleanup() {
return cleanup;
}
public void setCleanup(Cleanup cleanup) {
this.cleanup = cleanup;
}
public Map<String, String> getProperties() { public Map<String, String> getProperties() {
return this.properties; return this.properties;
} }
...@@ -1248,53 +1240,57 @@ public class KafkaProperties { ...@@ -1248,53 +1240,57 @@ public class KafkaProperties {
} }
public enum IsolationLevel { public static class Cleanup {
/** /**
* Read everything including aborted transactions. * Cleanup the application’s local state directory on startup.
*/ */
READ_UNCOMMITTED((byte) 0), private boolean onStartup = false;
/** /**
* Read records from committed transactions, in addition to records not part of * Cleanup the application’s local state directory on shutdown.
* transactions.
*/ */
READ_COMMITTED((byte) 1); private boolean onShutdown = true;
private final byte id; public boolean isOnStartup() {
return this.onStartup;
}
IsolationLevel(byte id) { public void setOnStartup(boolean onStartup) {
this.id = id; this.onStartup = onStartup;
} }
public byte id() { public boolean isOnShutdown() {
return this.id; return this.onShutdown;
}
public void setOnShutdown(boolean onShutdown) {
this.onShutdown = onShutdown;
} }
} }
public static class Cleanup { public enum IsolationLevel {
/** /**
* Cleanup the application's state on start. * Read everything including aborted transactions.
*/ */
private boolean onStart = false; READ_UNCOMMITTED((byte) 0),
/** /**
* Cleanup the application's state on stop. * Read records from committed transactions, in addition to records not part of
* transactions.
*/ */
private boolean onStop = true; READ_COMMITTED((byte) 1);
public CleanupConfig buildCleanupConfig() { private final byte id;
return new CleanupConfig(this.onStart, this.onStop);
}
public boolean isOnStart() { IsolationLevel(byte id) {
return onStart; this.id = id;
} }
public boolean isOnStop() { public byte id() {
return onStop; return this.id;
} }
} }
......
...@@ -34,6 +34,7 @@ import org.springframework.core.env.Environment; ...@@ -34,6 +34,7 @@ import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
/** /**
* Configuration for Kafka Streams annotation-driven support. * Configuration for Kafka Streams annotation-driven support.
...@@ -91,11 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration { ...@@ -91,11 +92,9 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup(); KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
if (cleanup != null) { CleanupConfig cleanupConfig = new CleanupConfig(cleanup.isOnStartup(), cleanup.isOnShutdown());
this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig()); this.factoryBean.setCleanupConfig(cleanupConfig);
}
} }
} }
......
...@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer; ...@@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
...@@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; ...@@ -50,6 +51,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaStreamsConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
...@@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests { ...@@ -340,6 +342,26 @@ class KafkaAutoConfigurationTests {
}); });
} }
@Test
void streamsWithCleanupConfig() {
this.contextRunner
.withUserConfiguration(EnableKafkaStreamsConfiguration.class, TestKafkaStreamsConfiguration.class)
.withPropertyValues("spring.application.name=my-test-app",
"spring.kafka.bootstrap-servers=localhost:9092,localhost:9093",
"spring.kafka.streams.auto-startup=false", "spring.kafka.streams.cleanup.on-startup=true",
"spring.kafka.streams.cleanup.on-shutdown=false")
.run((context) -> {
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context
.getBean(StreamsBuilderFactoryBean.class);
assertThat(streamsBuilderFactoryBean)
.extracting("cleanupConfig", InstanceOfAssertFactories.type(CleanupConfig.class))
.satisfies((cleanupConfig) -> {
assertThat(cleanupConfig.cleanupOnStart()).isTrue();
assertThat(cleanupConfig.cleanupOnStop()).isFalse();
});
});
}
@Test @Test
void streamsApplicationIdIsMandatory() { void streamsApplicationIdIsMandatory() {
this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> { this.contextRunner.withUserConfiguration(EnableKafkaStreamsConfiguration.class).run((context) -> {
......
...@@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka; ...@@ -18,8 +18,10 @@ package org.springframework.boot.autoconfigure.kafka;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.listener.ContainerProperties; import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
...@@ -48,4 +50,12 @@ class KafkaPropertiesTests { ...@@ -48,4 +50,12 @@ class KafkaPropertiesTests {
assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal()); assertThat(listenerProperties.isMissingTopicsFatal()).isEqualTo(container.isMissingTopicsFatal());
} }
@Test
void cleanupConfigDefaultValuesAreConsistent() {
CleanupConfig cleanupConfig = new CleanupConfig();
Cleanup cleanup = new KafkaProperties().getStreams().getCleanup();
assertThat(cleanup.isOnStartup()).isEqualTo(cleanupConfig.cleanupOnStart());
assertThat(cleanup.isOnShutdown()).isEqualTo(cleanupConfig.cleanupOnStop());
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment