GH-3090: Ensure client factory customizers are applied before transaction manager creation

Fixes: #3090

Issue: https://github.com/spring-cloud/spring-cloud-stream/issues/3090

Fixes issue where KafkaBinderConfiguration would add customizers after the
transaction manager was already created.

The KafkaMessageChannelBinder was initializing the transaction manager in the
constructor before client factory customizers were added, which meant the customizers
were never applied to the producer factory used by the transaction manager.

- Moves transaction manager initialization from the constructor to the onInit() method
- Leverages the existing InitializingBean lifecycle to ensure customizers are applied
  before the transaction manager is created
- Adds comprehensive test coverage to verify the fix in the binder and via auto-configuration

Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>

Resolves #3091
This commit is contained in:
Soby Chacko
2025-03-06 17:24:16 -05:00
committed by Oleg Zhurakousky
parent 525a5c1220
commit d6cc07df97
4 changed files with 313 additions and 15 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2014-2024 the original author or authors.
* Copyright 2014-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -222,9 +222,9 @@ public class KafkaMessageChannelBinder extends
private final Map<String, TopicInformation> topicsInUse = new ConcurrentHashMap<>();
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
private KafkaTransactionManager<byte[], byte[]> transactionManager;
private final TransactionTemplate transactionTemplate;
private TransactionTemplate transactionTemplate;
private KafkaBindingRebalanceListener rebalanceListener;
@@ -278,23 +278,24 @@ public class KafkaMessageChannelBinder extends
super(headersToMap(configurationProperties), provisioningProvider,
containerCustomizer, sourceCustomizer);
this.configurationProperties = configurationProperties;
String txId = configurationProperties.getTransaction().getTransactionIdPrefix();
if (StringUtils.hasText(txId)) {
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
txId, new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension()), txId + ".producer", null));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
else {
this.transactionManager = null;
this.transactionTemplate = null;
}
this.rebalanceListener = rebalanceListener;
this.dlqPartitionFunction = dlqPartitionFunction;
this.dlqDestinationResolver = dlqDestinationResolver;
this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties()));
}
@Override
protected void onInit() throws Exception {
super.onInit();
String txId = this.configurationProperties.getTransaction().getTransactionIdPrefix();
if (StringUtils.hasText(txId)) {
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
txId, new ExtendedProducerProperties<>(configurationProperties
.getTransaction().getProducer().getExtension()), txId + ".producer", null));
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
}
}
private static String[] headersToMap(
KafkaBinderConfigurationProperties configurationProperties) {
String[] headersToMap;

View File

@@ -0,0 +1,109 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.util.ReflectionUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Soby Chacko
*/
@SpringBootTest(classes = { KafkaBinderConfiguration.class })
public class KafkaBinderConfigurationWithTransactionsTest {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class)
.withPropertyValues(
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=test-tx-",
"spring.kafka.bootstrap-servers=localhost:9092");
@Test
public void clientFactoryCustomizersAppliedToTransactionManager() {
contextRunner.withUserConfiguration(TransactionClientFactoryCustomizerConfig.class)
.run(context -> {
assertThat(context).hasSingleBean(KafkaMessageChannelBinder.class);
KafkaMessageChannelBinder kafkaMessageChannelBinder =
context.getBean(KafkaMessageChannelBinder.class);
Map<String, ClientFactoryCustomizer> customizers =
context.getBeansOfType(ClientFactoryCustomizer.class);
assertThat(customizers).hasSize(1);
Field transactionManagerField = ReflectionUtils.findField(
KafkaMessageChannelBinder.class, "transactionManager",
KafkaTransactionManager.class);
assertThat(transactionManagerField).isNotNull();
ReflectionUtils.makeAccessible(transactionManagerField);
KafkaTransactionManager<?, ?> transactionManager =
(KafkaTransactionManager<?, ?>) ReflectionUtils.getField(
transactionManagerField, kafkaMessageChannelBinder);
assertThat(transactionManager).isNotNull();
ProducerFactory<?, ?> producerFactory = transactionManager.getProducerFactory();
assertThat(producerFactory).isNotNull();
// Verify customizer was applied - check if our flag was set
TransactionClientFactoryCustomizerConfig config =
context.getBean(TransactionClientFactoryCustomizerConfig.class);
assertThat(config.wasCustomizerApplied()).isTrue();
assertThat(config.getCustomizedFactories()).contains(producerFactory);
});
}
@Configuration
static class TransactionClientFactoryCustomizerConfig {
private final List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
private boolean customizerApplied = false;
@Bean
ClientFactoryCustomizer testClientFactoryCustomizer() {
return new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizerApplied = true;
customizedFactories.add(pf);
}
};
}
public boolean wasCustomizerApplied() {
return customizerApplied;
}
public List<ProducerFactory<?, ?>> getCustomizedFactories() {
return customizedFactories;
}
}
}

View File

@@ -0,0 +1,184 @@
/*
* Copyright 2025-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.stream.binder.kafka;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.TestUtils;
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import org.springframework.retry.support.RetryTemplate;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
* @author Soby Chacko
*/
@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1",
"transaction.state.log.min.isr=1"})
class KafkaBinderTransactionCustomizerTest {
private static EmbeddedKafkaBroker embeddedKafka;
@BeforeAll
public static void setup() {
embeddedKafka = EmbeddedKafkaCondition.getBroker();
}
@SuppressWarnings("unchecked")
@Test
void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties, mock(ObjectProvider.class));
configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties, prop -> {
});
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
// Create a tracking list for customized factories
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
// Create a customizer that we'll register after the binder is created
ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizedFactories.add(pf);
}
};
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider));
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
// Add the customizer AFTER binder creation but BEFORE afterPropertiesSet
binder.addClientFactoryCustomizer(customizer);
// Now initialize the binder (this triggers onInit)
binder.afterPropertiesSet();
// Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit
verify(binder).getProducerFactory(
eq("custom-tx-"),
any(ExtendedProducerProperties.class),
eq("custom-tx-.producer"),
isNull());
// Verify customizer was applied
assertThat(customizedFactories).isNotEmpty();
// Verify that the producer factory from the transaction manager is in our list of customized factories
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
TestUtils.getPropertyValue(binder, "transactionManager");
assertThat(txManager).isNotNull();
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
// This verifies that the same producer factory that was customized is used for the transaction manager
assertThat(customizedFactories).contains(producerFactory);
}
@SuppressWarnings("unchecked")
@Test
void multipleCustomizersAppliedInOrder() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getBrokersAsString()));
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
kafkaProperties, mock(ObjectProvider.class));
configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-");
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
configurationProperties, kafkaProperties, prop -> {
});
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
// Track order of customizers and customized factories
List<String> customizationOrder = new ArrayList<>();
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
configurationProperties, provisioningProvider));
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
// Add multiple customizers
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer1");
customizedFactories.add(pf);
}
});
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer2");
}
});
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
@Override
public void configure(ProducerFactory<?, ?> pf) {
customizationOrder.add("customizer3");
}
});
binder.afterPropertiesSet();
assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3");
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
TestUtils.getPropertyValue(binder, "transactionManager");
assertThat(txManager).isNotNull();
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
// Verify that the producer factory used in transaction manager is one that was customized
assertThat(customizedFactories).contains(producerFactory);
}
}

View File

@@ -71,7 +71,7 @@ class KafkaTransactionTests {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void producerRunsInTx() {
void producerRunsInTx() throws Exception {
KafkaProperties kafkaProperties = new TestKafkaProperties();
kafkaProperties.setBootstrapServers(Collections
.singletonList(embeddedKafka.getBrokersAsString()));
@@ -111,6 +111,10 @@ class KafkaTransactionTests {
GenericApplicationContext applicationContext = new GenericApplicationContext();
applicationContext.refresh();
binder.setApplicationContext(applicationContext);
// Important: Initialize the binder to trigger onInit()
binder.afterPropertiesSet();
DirectChannel channel = new DirectChannel();
KafkaProducerProperties extension = new KafkaProducerProperties();
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(