Commit f4799c74 authored by Adrien Bennadji's avatar Adrien Bennadji Committed by Stephane Nicoll

Add configuration options for Kafka Stream's CleanupConfig

See gh-23636
parent a5b27789
...@@ -37,6 +37,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; ...@@ -37,6 +37,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.convert.DurationUnit; import org.springframework.boot.convert.DurationUnit;
import org.springframework.core.io.Resource; import org.springframework.core.io.Resource;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
...@@ -722,6 +723,11 @@ public class KafkaProperties { ...@@ -722,6 +723,11 @@ public class KafkaProperties {
*/ */
private String stateDir; private String stateDir;
/**
* Cleanup configuration for the state stores.
*/
private Cleanup cleanup;
/** /**
* Additional Kafka properties used to configure the streams. * Additional Kafka properties used to configure the streams.
*/ */
...@@ -791,6 +797,14 @@ public class KafkaProperties { ...@@ -791,6 +797,14 @@ public class KafkaProperties {
this.stateDir = stateDir; this.stateDir = stateDir;
} }
public Cleanup getCleanup() {
return cleanup;
}
public void setCleanup(Cleanup cleanup) {
this.cleanup = cleanup;
}
public Map<String, String> getProperties() { public Map<String, String> getProperties() {
return this.properties; return this.properties;
} }
...@@ -1259,6 +1273,32 @@ public class KafkaProperties { ...@@ -1259,6 +1273,32 @@ public class KafkaProperties {
} }
public static class Cleanup {
/**
* Cleanup the application's state on start.
*/
private boolean onStart = false;
/**
* Cleanup the application's state on stop.
*/
private boolean onStop = true;
public CleanupConfig buildCleanupConfig() {
return new CleanupConfig(this.onStart, this.onStop);
}
public boolean isOnStart() {
return onStart;
}
public boolean isOnStop() {
return onStop;
}
}
@SuppressWarnings("serial") @SuppressWarnings("serial")
private static class Properties extends HashMap<String, Object> { private static class Properties extends HashMap<String, Object> {
......
...@@ -91,6 +91,11 @@ class KafkaStreamsAnnotationDrivenConfiguration { ...@@ -91,6 +91,11 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup()); this.factoryBean.setAutoStartup(this.properties.getStreams().isAutoStartup());
KafkaProperties.Cleanup cleanup = this.properties.getStreams().getCleanup();
if (cleanup != null) {
this.factoryBean.setCleanupConfig(cleanup.buildCleanupConfig());
}
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment