Commit b39479bc authored by Andy Wilkinson's avatar Andy Wilkinson

Polish "Add config property for Kafka consumer isolation level"

See gh-17389
parent 1b8f955f
...@@ -23,12 +23,14 @@ import java.util.ArrayList; ...@@ -23,12 +23,14 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.StringSerializer;
...@@ -267,11 +269,9 @@ public class KafkaProperties { ...@@ -267,11 +269,9 @@ public class KafkaProperties {
private Duration heartbeatInterval; private Duration heartbeatInterval;
/** /**
* Controls how transactional messages are returned when polling the broker * Isolation level for reading messages that have been written transactionally.
* (non-transactional messages will be unconditionally returned, regardless of
* this setting).
*/ */
private String isolationLevel; private IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
/** /**
* Deserializer class for keys. * Deserializer class for keys.
...@@ -369,11 +369,11 @@ public class KafkaProperties { ...@@ -369,11 +369,11 @@ public class KafkaProperties {
this.heartbeatInterval = heartbeatInterval; this.heartbeatInterval = heartbeatInterval;
} }
public String getIsolationLevel() { public IsolationLevel getIsolationLevel() {
return this.isolationLevel; return this.isolationLevel;
} }
public void setIsolationLevel(String isolationLevel) { public void setIsolationLevel(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel; this.isolationLevel = isolationLevel;
} }
...@@ -421,7 +421,8 @@ public class KafkaProperties { ...@@ -421,7 +421,8 @@ public class KafkaProperties {
map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG)); map.from(this::getGroupId).to(properties.in(ConsumerConfig.GROUP_ID_CONFIG));
map.from(this::getHeartbeatInterval).asInt(Duration::toMillis) map.from(this::getHeartbeatInterval).asInt(Duration::toMillis)
.to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)); .to(properties.in(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG));
map.from(this::getIsolationLevel).to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG)); map.from(() -> getIsolationLevel().name().toLowerCase(Locale.ROOT))
.to(properties.in(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
......
...@@ -2228,24 +2228,6 @@ ...@@ -2228,24 +2228,6 @@
} }
] ]
}, },
{
"name": "spring.kafka.consumer.isolation-level",
"values": [
{
"value": "read_committed",
"description": "Only consume transactional messages that have been committed."
},
{
"value": "read_uncommitted",
"description": "Consume all transactional messages (even those that have been aborted)."
}
],
"providers": [
{
"name": "any"
}
]
},
{ {
"name": "spring.kafka.producer.key-serializer", "name": "spring.kafka.producer.key-serializer",
"providers": [ "providers": [
......
...@@ -105,6 +105,7 @@ class KafkaAutoConfigurationTests { ...@@ -105,6 +105,7 @@ class KafkaAutoConfigurationTests {
"spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456", "spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456",
"spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB", "spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=1KB",
"spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234", "spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234",
"spring.kafka.consumer.isolation-level = read-committed",
"spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer", "spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.LongDeserializer",
"spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer") "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer")
.run((context) -> { .run((context) -> {
...@@ -133,6 +134,7 @@ class KafkaAutoConfigurationTests { ...@@ -133,6 +134,7 @@ class KafkaAutoConfigurationTests {
assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024); assertThat(configs.get(ConsumerConfig.FETCH_MIN_BYTES_CONFIG)).isEqualTo(1024);
assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar"); assertThat(configs.get(ConsumerConfig.GROUP_ID_CONFIG)).isEqualTo("bar");
assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234); assertThat(configs.get(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG)).isEqualTo(234);
assertThat(configs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG)).isEqualTo("read_committed");
assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(LongDeserializer.class); .isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment