From d6cc07df97b44e08d9d80e246dfd369d448914ab Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 6 Mar 2025 17:24:16 -0500 Subject: [PATCH] GH-3090: Ensure client factory customizers are applied before transaction manager creation Fixes: #3090 Issue: https://github.com/spring-cloud/spring-cloud-stream/issues/3090 Fixes issue where KafkaBinderConfiguration would add customizers after the transaction manager was already created. The KafkaMessageChannelBinder was initializing the transaction manager in the constructor before client factory customizers were added, which meant the customizers were never applied to the producer factory used by the transaction manager. - Moves transaction manager initialization from the constructor to the onInit() method - Leverages the existing InitializingBean lifecycle to ensure customizers are applied before the transaction manager is created - Adds comprehensive test coverage to verify the fix in the binder and via auto-configuration Signed-off-by: Soby Chacko Resolves #3091 --- .../kafka/KafkaMessageChannelBinder.java | 29 +-- ...nderConfigurationWithTransactionsTest.java | 109 +++++++++++ .../KafkaBinderTransactionCustomizerTest.java | 184 ++++++++++++++++++ .../binder/kafka/KafkaTransactionTests.java | 6 +- 4 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index f4d4ee74f..e5adfe503 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -222,9 +222,9 @@ public class KafkaMessageChannelBinder extends private final Map topicsInUse = new ConcurrentHashMap<>(); - private final KafkaTransactionManager transactionManager; + private KafkaTransactionManager transactionManager; - private final TransactionTemplate transactionTemplate; + private TransactionTemplate transactionTemplate; private KafkaBindingRebalanceListener rebalanceListener; @@ -278,23 +278,24 @@ public class KafkaMessageChannelBinder extends super(headersToMap(configurationProperties), provisioningProvider, containerCustomizer, sourceCustomizer); this.configurationProperties = configurationProperties; - String txId = configurationProperties.getTransaction().getTransactionIdPrefix(); - if (StringUtils.hasText(txId)) { - this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( - txId, new ExtendedProducerProperties<>(configurationProperties - .getTransaction().getProducer().getExtension()), txId + ".producer", null)); - this.transactionTemplate = new TransactionTemplate(this.transactionManager); - } - else { - this.transactionManager = null; - this.transactionTemplate = null; - } this.rebalanceListener = rebalanceListener; this.dlqPartitionFunction = dlqPartitionFunction; this.dlqDestinationResolver = dlqDestinationResolver; this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties())); } + @Override + protected void onInit() throws Exception { + super.onInit(); + String txId = this.configurationProperties.getTransaction().getTransactionIdPrefix(); + if (StringUtils.hasText(txId)) { + this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( + txId, new ExtendedProducerProperties<>(configurationProperties + .getTransaction().getProducer().getExtension()), txId + ".producer", null)); + this.transactionTemplate = new TransactionTemplate(this.transactionManager); + } + } + private static String[] headersToMap( KafkaBinderConfigurationProperties configurationProperties) { String[] headersToMap; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java new file mode 100644 index 000000000..c06b8a1ba --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + */ +@SpringBootTest(classes = { KafkaBinderConfiguration.class }) +public class KafkaBinderConfigurationWithTransactionsTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class) + .withPropertyValues( + "spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=test-tx-", + "spring.kafka.bootstrap-servers=localhost:9092"); + + @Test + public void clientFactoryCustomizersAppliedToTransactionManager() { + contextRunner.withUserConfiguration(TransactionClientFactoryCustomizerConfig.class) + .run(context -> { + assertThat(context).hasSingleBean(KafkaMessageChannelBinder.class); + KafkaMessageChannelBinder kafkaMessageChannelBinder = + context.getBean(KafkaMessageChannelBinder.class); + + Map customizers = + context.getBeansOfType(ClientFactoryCustomizer.class); + assertThat(customizers).hasSize(1); + + Field transactionManagerField = ReflectionUtils.findField( + KafkaMessageChannelBinder.class, "transactionManager", + KafkaTransactionManager.class); + assertThat(transactionManagerField).isNotNull(); + ReflectionUtils.makeAccessible(transactionManagerField); + KafkaTransactionManager transactionManager = + (KafkaTransactionManager) ReflectionUtils.getField( + transactionManagerField, kafkaMessageChannelBinder); + + assertThat(transactionManager).isNotNull(); + + ProducerFactory producerFactory = transactionManager.getProducerFactory(); + assertThat(producerFactory).isNotNull(); + + // Verify customizer was applied - check if our flag was set + TransactionClientFactoryCustomizerConfig config = + context.getBean(TransactionClientFactoryCustomizerConfig.class); + assertThat(config.wasCustomizerApplied()).isTrue(); + assertThat(config.getCustomizedFactories()).contains(producerFactory); + }); + } + + @Configuration + static class TransactionClientFactoryCustomizerConfig { + private final List> customizedFactories = new ArrayList<>(); + private boolean customizerApplied = false; + + @Bean + ClientFactoryCustomizer testClientFactoryCustomizer() { + return new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizerApplied = true; + customizedFactories.add(pf); + } + }; + } + + public boolean wasCustomizerApplied() { + return customizerApplied; + } + + public List> getCustomizedFactories() { + return customizedFactories; + } + } +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java new file mode 100644 index 000000000..e99e68829 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.TestUtils; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.retry.support.RetryTemplate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * @author Soby Chacko + */ +@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1"}) +class KafkaBinderTransactionCustomizerTest { + + private static EmbeddedKafkaBroker embeddedKafka; + + @BeforeAll + public static void setup() { + embeddedKafka = EmbeddedKafkaCondition.getBroker(); + } + + @SuppressWarnings("unchecked") + @Test + void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Create a tracking list for customized factories + List> customizedFactories = new ArrayList<>(); + + // Create a customizer that we'll register after the binder is created + ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizedFactories.add(pf); + } + }; + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add the customizer AFTER binder creation but BEFORE afterPropertiesSet + binder.addClientFactoryCustomizer(customizer); + + // Now initialize the binder (this triggers onInit) + binder.afterPropertiesSet(); + + // Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit + verify(binder).getProducerFactory( + eq("custom-tx-"), + any(ExtendedProducerProperties.class), + eq("custom-tx-.producer"), + isNull()); + + // Verify customizer was applied + assertThat(customizedFactories).isNotEmpty(); + + // Verify that the producer factory from the transaction manager is in our list of customized factories + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // This verifies that the same producer factory that was customized is used for the transaction manager + assertThat(customizedFactories).contains(producerFactory); + } + + @SuppressWarnings("unchecked") + @Test + void multipleCustomizersAppliedInOrder() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Track order of customizers and customized factories + List customizationOrder = new ArrayList<>(); + List> customizedFactories = new ArrayList<>(); + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add multiple customizers + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer1"); + customizedFactories.add(pf); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer2"); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer3"); + } + }); + + binder.afterPropertiesSet(); + + assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3"); + + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // Verify that the producer factory used in transaction manager is one that was customized + assertThat(customizedFactories).contains(producerFactory); + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java index 25a65a10b..1c018396f 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java @@ -71,7 +71,7 @@ class KafkaTransactionTests { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - void producerRunsInTx() { + void producerRunsInTx() throws Exception { KafkaProperties kafkaProperties = new TestKafkaProperties(); kafkaProperties.setBootstrapServers(Collections .singletonList(embeddedKafka.getBrokersAsString())); @@ -111,6 +111,10 @@ class KafkaTransactionTests { GenericApplicationContext applicationContext = new GenericApplicationContext(); applicationContext.refresh(); binder.setApplicationContext(applicationContext); + + // Important: Initialize the binder to trigger onInit() + binder.afterPropertiesSet(); + DirectChannel channel = new DirectChannel(); KafkaProducerProperties extension = new KafkaProducerProperties(); ExtendedProducerProperties properties = new ExtendedProducerProperties<>(