Commit 7f53aef5 authored by Andy Wilkinson's avatar Andy Wilkinson

Upgrade to Kafka 2.6.0

Closes gh-22731
parent a8130da3
...@@ -16,12 +16,19 @@ ...@@ -16,12 +16,19 @@
package org.springframework.boot.actuate.autoconfigure.metrics; package org.springframework.boot.actuate.autoconfigure.metrics;
import java.util.regex.Pattern;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.boot.actuate.autoconfigure.metrics.test.MetricsRun; import org.springframework.boot.actuate.autoconfigure.metrics.test.MetricsRun;
import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.config.StreamsBuilderFactoryBean; import org.springframework.kafka.config.StreamsBuilderFactoryBean;
...@@ -93,6 +100,12 @@ class KafkaMetricsAutoConfigurationTests { ...@@ -93,6 +100,12 @@ class KafkaMetricsAutoConfigurationTests {
@EnableKafkaStreams @EnableKafkaStreams
static class EnableKafkaStreamsConfiguration { static class EnableKafkaStreamsConfiguration {
@Bean
public KTable<?, ?> table(StreamsBuilder builder) {
KStream<Object, Object> stream = builder.stream(Pattern.compile("test"));
return stream.groupByKey().count(Materialized.as("store"));
}
} }
} }
...@@ -18,9 +18,14 @@ package org.springframework.boot.autoconfigure.kafka; ...@@ -18,9 +18,14 @@ package org.springframework.boot.autoconfigure.kafka;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
...@@ -124,6 +129,12 @@ class KafkaAutoConfigurationIntegrationTests { ...@@ -124,6 +129,12 @@ class KafkaAutoConfigurationIntegrationTests {
@EnableKafkaStreams @EnableKafkaStreams
static class KafkaStreamsConfig { static class KafkaStreamsConfig {
@Bean
public KTable<?, ?> table(StreamsBuilder builder) {
KStream<Object, Object> stream = builder.stream(Pattern.compile("test"));
return stream.groupByKey().count(Materialized.as("store"));
}
} }
static class Listener { static class Listener {
......
...@@ -892,7 +892,7 @@ bom { ...@@ -892,7 +892,7 @@ bom {
] ]
} }
} }
library("Kafka", "2.5.0") { library("Kafka", "2.6.0") {
group("org.apache.kafka") { group("org.apache.kafka") {
modules = [ modules = [
"connect-api", "connect-api",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment