From 04769ca5c794075140bc75e644eebb6c91fa5e1e Mon Sep 17 00:00:00 2001 From: Marcin Grzejszczak Date: Tue, 23 Nov 2021 08:58:46 +0100 Subject: [PATCH] Allows providing a custom KafkaTemplate bean; fixes gh-1704 --- .../asciidoc/_project-features-messaging.adoc | 2 ++ .../ContractVerifierKafkaConfiguration.java | 11 +++++++-- .../messaging/kafka/KafkaStubMessages.java | 2 +- ...ntractVerifierKafkaConfigurationTests.java | 23 +++++++++++++++++++ 4 files changed, 35 insertions(+), 3 deletions(-) diff --git a/docs/src/main/asciidoc/_project-features-messaging.adoc b/docs/src/main/asciidoc/_project-features-messaging.adoc index 305db71386..9ac2539e80 100644 --- a/docs/src/main/asciidoc/_project-features-messaging.adoc +++ b/docs/src/main/asciidoc/_project-features-messaging.adoc @@ -1190,6 +1190,8 @@ with `@AutoConfigureStubRunner`. With Kafka integration, in order to poll for a single message, we need to register a consumer upon Spring context startup. That may lead to a situation that, when you are on the consumer side, Stub Runner can register an additional consumer for the same group ID and topic. That could lead to a situation that only one of the components would actually poll for the message. Since, on the consumer side, you have both the Spring Cloud Contract Stub Runner and Spring Cloud Contract Verifier classpath, we need to be able to switch off such behavior. That is done automatically through the `stubrunner.kafka.initializer.enabled` flag, which disables the Contact Verifier consumer registration. If your application is both the consumer and the producer of a Kafka message, you might need to manually toggle that property to `false` in the base class of your generated tests. +If you have multiple `KafkaTemplate` beans, you can provide your own bean of `Supplier` type that returns the `KafkaTemplate` of your chosing. + :input_name: input :output_name: output diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfiguration.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfiguration.java index b513e34f99..3ad1e9ab6a 100644 --- a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfiguration.java +++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfiguration.java @@ -19,6 +19,7 @@ package org.springframework.cloud.contract.verifier.messaging.kafka; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,9 +56,15 @@ public class ContractVerifierKafkaConfiguration { @Bean @ConditionalOnMissingBean - MessageVerifier> contractVerifierKafkaMessageExchange(KafkaTemplate kafkaTemplate, + MessageVerifier> contractVerifierKafkaMessageExchange(Supplier kafkaTemplate, EmbeddedKafkaBroker broker, KafkaProperties kafkaProperties, KafkaStubMessagesInitializer initializer) { - return new KafkaStubMessages(kafkaTemplate, broker, kafkaProperties, initializer); + return new KafkaStubMessages(kafkaTemplate.get(), broker, kafkaProperties, initializer); + } + + @Bean + @ConditionalOnMissingBean + Supplier contractVerifierKafkaTemplateSupplier(KafkaTemplate kafkaTemplate) { + return () -> kafkaTemplate; } @Bean diff --git a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java index 489cdad2be..8d6de0f085 100644 --- a/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java +++ b/spring-cloud-contract-verifier/src/main/java/org/springframework/cloud/contract/verifier/messaging/kafka/KafkaStubMessages.java @@ -46,7 +46,7 @@ class KafkaStubMessages implements MessageVerifier> { private static final Log log = LogFactory.getLog(KafkaStubMessages.class); - private final KafkaTemplate kafkaTemplate; + final KafkaTemplate kafkaTemplate; private final Receiver receiver; diff --git a/spring-cloud-contract-verifier/src/test/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfigurationTests.java b/spring-cloud-contract-verifier/src/test/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfigurationTests.java index a536868cf2..919eb34814 100644 --- a/spring-cloud-contract-verifier/src/test/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfigurationTests.java +++ b/spring-cloud-contract-verifier/src/test/java/org/springframework/cloud/contract/verifier/messaging/kafka/ContractVerifierKafkaConfigurationTests.java @@ -16,12 +16,16 @@ package org.springframework.cloud.contract.verifier.messaging.kafka; +import java.util.function.Supplier; + import org.junit.Test; +import org.mockito.BDDMockito; import org.springframework.boot.autoconfigure.AutoConfigurations; import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.EmbeddedKafkaBroker; import static org.assertj.core.api.Assertions.assertThat; @@ -44,6 +48,14 @@ public class ContractVerifierKafkaConfigurationTests { }); } + @Test + public void shouldPickCustomKafkaTemplate() { + this.contextRunner.withUserConfiguration(CustomKafkaTemplateConfiguration.class).run((context) -> { + assertThat(context.getBeansOfType(KafkaStubMessages.class)).hasSize(1); + assertThat(context.getBean(KafkaStubMessages.class).kafkaTemplate).isSameAs(context.getBean(CustomKafkaTemplateConfiguration.class).myKafkaTemplate); + }); + } + @Test public void shouldNotCreateBeansWhenDisabled() { this.contextRunner.withPropertyValues("stubrunner.kafka.enabled=false").run((context) -> { @@ -69,4 +81,15 @@ public class ContractVerifierKafkaConfigurationTests { } + static class CustomKafkaTemplateConfiguration { + + KafkaTemplate myKafkaTemplate = BDDMockito.mock(KafkaTemplate.class); + + @Bean + Supplier myKafkaTemplate() { + return () -> myKafkaTemplate; + } + + } + }