Commit 349d7556 authored by Stephane Nicoll's avatar Stephane Nicoll

Start building against Spring Kafka 2.7.0-M2 snapshots

See gh-25295
parent 56a26349
......@@ -43,7 +43,6 @@ import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory;
......@@ -70,11 +69,9 @@ import org.springframework.kafka.support.converter.BatchMessageConverter;
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.support.converter.RecordMessageConverter;
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.transaction.PlatformTransactionManager;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.entry;
......@@ -560,13 +557,16 @@ class KafkaAutoConfigurationTests {
}
@Test
@SuppressWarnings("unchecked")
void testConcurrentKafkaListenerContainerFactoryWithCustomTransactionManager() {
this.contextRunner.withUserConfiguration(TransactionManagerConfiguration.class)
KafkaTransactionManager<Object, Object> customTransactionManager = mock(KafkaTransactionManager.class);
this.contextRunner
.withBean("customTransactionManager", KafkaTransactionManager.class, () -> customTransactionManager)
.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test").run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getTransactionManager())
.isSameAs(context.getBean("chainedTransactionManager"));
.isSameAs(context.getBean("customTransactionManager"));
});
}
......@@ -683,24 +683,12 @@ class KafkaAutoConfigurationTests {
}
@Configuration(proxyBeanMethods = false)
static class TransactionManagerConfiguration {
@Bean
@Primary
PlatformTransactionManager chainedTransactionManager(
KafkaTransactionManager<String, String> kafkaTransactionManager) {
return new ChainedKafkaTransactionManager<String, String>(kafkaTransactionManager);
}
}
@Configuration(proxyBeanMethods = false)
static class AfterRollbackProcessorConfiguration {
@Bean
AfterRollbackProcessor<Object, Object> afterRollbackProcessor() {
return (records, consumer, ex, recoverable, eosMode) -> {
return (records, consumer, container, ex, recoverable, eosMode) -> {
// no-op
};
}
......
......@@ -1648,7 +1648,7 @@ bom {
]
}
}
library("Spring Kafka", "2.7.0-M1") {
library("Spring Kafka", "2.7.0-SNAPSHOT") {
group("org.springframework.kafka") {
modules = [
"spring-kafka",
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment