Commit 1b8f955f authored by Ryan Dunckel's avatar Ryan Dunckel Committed by Andy Wilkinson

Add config property for Kafka consumer isolation level

See gh-17389
parent 0d124e98
...@@ -266,6 +266,13 @@ public class KafkaProperties { ...@@ -266,6 +266,13 @@ public class KafkaProperties {
*/ */
private Duration heartbeatInterval; private Duration heartbeatInterval;
/**
* Controls how transactional messages are returned when polling the broker
* (non-transactional messages will be unconditionally returned, regardless of
* this setting).
*/
private String isolationLevel;
/** /**
* Deserializer class for keys. * Deserializer class for keys.
*/ */
...@@ -362,6 +369,14 @@ public class KafkaProperties { ...@@ -362,6 +369,14 @@ public class KafkaProperties {
this.heartbeatInterval = heartbeatInterval; this.heartbeatInterval = heartbeatInterval;
} }
public String getIsolationLevel() {
return this.isolationLevel;
}
public void setIsolationLevel(String isolationLevel) {
this.isolationLevel = isolationLevel;
}
public Class<?> getKeyDeserializer() { public Class<?> getKeyDeserializer() {
return this.keyDeserializer; return this.keyDeserializer;
} }
...@@ -406,6 +421,7 @@ public class KafkaProperties { ...@@ -406,6 +421,7 @@ public class KafkaProperties {
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
map.from(this::getIsolationLevel).to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
......
...@@ -2228,6 +2228,24 @@ ...@@ -2228,6 +2228,24 @@
} }
] ]
}, },
{
"name": "spring.kafka.consumer.isolation-level",
"values": [
{
"value": "read_committed",
"description": "Only consume transactional messages that have been committed."
},
{
"value": "read_uncommitted",
"description": "Consume all transactional messages (even those that have been aborted)."
}
],
"providers": [
{
"name": "any"
}
]
},
{ {
"name": "spring.kafka.producer.key-serializer", "name": "spring.kafka.producer.key-serializer",
"providers": [ "providers": [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment