GH-3086 Define explicit defineInput/Output binding methods

Also changed signature to return properties for convinience
Added test
This commit is contained in:
Oleg Zhurakousky
2025-04-04 15:21:58 +02:00
parent 93d4c21757
commit da4d4c5b46
2 changed files with 57 additions and 20 deletions

View File

@@ -38,6 +38,7 @@ import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties;
import org.springframework.cloud.stream.binding.BindingsLifecycleController;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -75,6 +76,18 @@ class KafkaBinderExtendedPropertiesTest {
@Autowired
private ConfigurableApplicationContext context;
@Test
void testDefiningNewBindingAndSettingItsProperties() throws Exception {
BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class);
KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding");
boolean isAutoRebalanceEnabled = consumerProperties.isAutoRebalanceEnabled();
assertThat(isAutoRebalanceEnabled).isTrue();
consumerProperties.setAutoRebalanceEnabled(false);
consumerProperties = controller.getExtensionProperties("test-input-binding-in-0");
isAutoRebalanceEnabled = consumerProperties.isAutoRebalanceEnabled();
assertThat(isAutoRebalanceEnabled).isFalse();
}
@Test
void kafkaBinderExtendedProperties() throws Exception {
@@ -140,7 +153,7 @@ class KafkaBinderExtendedPropertiesTest {
assertThat(rebalanceListener.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(rebalanceListener.bindings.keySet()).contains("standard-in",
"custom-in");
assertThat(rebalanceListener.bindings.values()).containsExactly(Boolean.TRUE,
assertThat(rebalanceListener.bindings.values()).contains(Boolean.TRUE,
Boolean.TRUE);
}

View File

@@ -81,26 +81,30 @@ public class BindingsLifecycleController implements ApplicationContextAware {
}
}
public void defineBinding(String bindingName, boolean isInputBinding) {
BindableFunctionProxyFactory bindingProxyFactory = new BindableFunctionProxyFactory(bindingName,
isInputBinding ? 1 : 0, isInputBinding ? 0 : 1, this.applicationContext.getBean(StreamFunctionProperties.class));
bindingProxyFactory.setApplicationContext(this.applicationContext);
/**
* Allows to dynamically define a new input binding returning its consumer properties for further customization.
* @param <P> the type of consumer properties. For example, if binding derives from Kafka, it will return KafkaConsumerProperties.
* @param bindingName the name of the binding.
* @return instance of the consumer properties.
*/
public <P> P defineInputBinding(String bindingName) {
BindableFunctionProxyFactory bindingProxyFactory =
new BindableFunctionProxyFactory(bindingName, 1, 0, this.applicationContext.getBean(StreamFunctionProperties.class));
this.defineBinding(bindingProxyFactory);
return this.getExtensionProperties(bindingName + "-in-0");
}
bindingProxyFactory.afterPropertiesSet();
BindingService bindingService = this.applicationContext.getBean(BindingService.class);
AbstractBindingLifecycle bindingLifecycle;
if (isInputBinding) {
bindingProxyFactory.createAndBindInputs(bindingService);
bindingLifecycle = this.applicationContext.getBean(InputBindingLifecycle.class);
}
else {
bindingProxyFactory.createAndBindOutputs(bindingService);
bindingLifecycle = this.applicationContext.getBean(OutputBindingLifecycle.class);
}
bindingLifecycle.startBindable(bindingProxyFactory);
/**
* Allows to dynamically define a new input binding returning its producer properties for further customization.
* @param <P> the type of producer properties. For example, if binding derives from Kafka, it will return KafkaProducerProperties.
* @param bindingName the name of the binding.
* @return instance of the producer properties.
*/
public <P> P defineOutputBinding(String bindingName) {
BindableFunctionProxyFactory bindingProxyFactory =
new BindableFunctionProxyFactory(bindingName, 0, 1, this.applicationContext.getBean(StreamFunctionProperties.class));
this.defineBinding(bindingProxyFactory);
return this.getExtensionProperties(bindingName + "-out-0");
}
/**
@@ -207,6 +211,26 @@ public class BindingsLifecycleController implements ApplicationContextAware {
this.applicationContext = applicationContext;
}
private void defineBinding(BindableFunctionProxyFactory bindingProxyFactory) {
bindingProxyFactory.setApplicationContext(this.applicationContext);
bindingProxyFactory.afterPropertiesSet();
BindingService bindingService = this.applicationContext.getBean(BindingService.class);
AbstractBindingLifecycle bindingLifecycle;
if (bindingProxyFactory.getInputs().size() > 0) {
bindingProxyFactory.createAndBindInputs(bindingService);
bindingLifecycle = this.applicationContext.getBean(InputBindingLifecycle.class);
}
else {
bindingProxyFactory.createAndBindOutputs(bindingService);
bindingLifecycle = this.applicationContext.getBean(OutputBindingLifecycle.class);
}
bindingLifecycle.startBindable(bindingProxyFactory);
}
/**
* Queries for all input {@link Binding}s.
* @return the list of input {@link Binding}s