Allows providing a custom KafkaTemplate bean; fixes gh-1704

This commit is contained in:
Marcin Grzejszczak
2021-11-23 08:58:46 +01:00
parent aa5e8a5145
commit 04769ca5c7
4 changed files with 35 additions and 3 deletions

View File

@@ -1190,6 +1190,8 @@ with `@AutoConfigureStubRunner`.
With Kafka integration, in order to poll for a single message, we need to register a consumer upon Spring context startup. That may lead to a situation that, when you are on the consumer side, Stub Runner can register an additional consumer for the same group ID and topic. That could lead to a situation that only one of the components would actually poll for the message. Since, on the consumer side, you have both the Spring Cloud Contract Stub Runner and Spring Cloud Contract Verifier classpath, we need to be able to switch off such behavior. That is done automatically through the `stubrunner.kafka.initializer.enabled` flag, which disables the Contact Verifier consumer registration. If your application is both the consumer and the producer of a Kafka message, you might need to manually toggle that property to `false` in the base class of your generated tests.
If you have multiple `KafkaTemplate` beans, you can provide your own bean of `Supplier<KafkaTemplate>` type that returns the `KafkaTemplate` of your chosing.
:input_name: input
:output_name: output

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.contract.verifier.messaging.kafka;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,9 +56,15 @@ public class ContractVerifierKafkaConfiguration {
@Bean
@ConditionalOnMissingBean
MessageVerifier<Message<?>> contractVerifierKafkaMessageExchange(KafkaTemplate kafkaTemplate,
MessageVerifier<Message<?>> contractVerifierKafkaMessageExchange(Supplier<KafkaTemplate> kafkaTemplate,
EmbeddedKafkaBroker broker, KafkaProperties kafkaProperties, KafkaStubMessagesInitializer initializer) {
return new KafkaStubMessages(kafkaTemplate, broker, kafkaProperties, initializer);
return new KafkaStubMessages(kafkaTemplate.get(), broker, kafkaProperties, initializer);
}
@Bean
@ConditionalOnMissingBean
Supplier<KafkaTemplate> contractVerifierKafkaTemplateSupplier(KafkaTemplate kafkaTemplate) {
return () -> kafkaTemplate;
}
@Bean

View File

@@ -46,7 +46,7 @@ class KafkaStubMessages implements MessageVerifier<Message<?>> {
private static final Log log = LogFactory.getLog(KafkaStubMessages.class);
private final KafkaTemplate kafkaTemplate;
final KafkaTemplate kafkaTemplate;
private final Receiver receiver;

View File

@@ -16,12 +16,16 @@
package org.springframework.cloud.contract.verifier.messaging.kafka;
import java.util.function.Supplier;
import org.junit.Test;
import org.mockito.BDDMockito;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import static org.assertj.core.api.Assertions.assertThat;
@@ -44,6 +48,14 @@ public class ContractVerifierKafkaConfigurationTests {
});
}
@Test
public void shouldPickCustomKafkaTemplate() {
this.contextRunner.withUserConfiguration(CustomKafkaTemplateConfiguration.class).run((context) -> {
assertThat(context.getBeansOfType(KafkaStubMessages.class)).hasSize(1);
assertThat(context.getBean(KafkaStubMessages.class).kafkaTemplate).isSameAs(context.getBean(CustomKafkaTemplateConfiguration.class).myKafkaTemplate);
});
}
@Test
public void shouldNotCreateBeansWhenDisabled() {
this.contextRunner.withPropertyValues("stubrunner.kafka.enabled=false").run((context) -> {
@@ -69,4 +81,15 @@ public class ContractVerifierKafkaConfigurationTests {
}
static class CustomKafkaTemplateConfiguration {
KafkaTemplate myKafkaTemplate = BDDMockito.mock(KafkaTemplate.class);
@Bean
Supplier<KafkaTemplate> myKafkaTemplate() {
return () -> myKafkaTemplate;
}
}
}