diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java index bb89e6caf..e860d9ed7 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/integration/KafkaBinderExtendedPropertiesTest.java @@ -38,6 +38,7 @@ import org.springframework.cloud.stream.binder.ProducerProperties; import org.springframework.cloud.stream.binder.kafka.KafkaBindingRebalanceListener; import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties; import org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties; +import org.springframework.cloud.stream.binding.BindingsLifecycleController; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -75,6 +76,18 @@ class KafkaBinderExtendedPropertiesTest { @Autowired private ConfigurableApplicationContext context; + @Test + void testDefiningNewBindingAndSettingItsProperties() throws Exception { + BindingsLifecycleController controller = context.getBean(BindingsLifecycleController.class); + KafkaConsumerProperties consumerProperties = controller.defineInputBinding("test-input-binding"); + boolean isAutoRebalanceEnabled = consumerProperties.isAutoRebalanceEnabled(); + assertThat(isAutoRebalanceEnabled).isTrue(); + consumerProperties.setAutoRebalanceEnabled(false); + consumerProperties = controller.getExtensionProperties("test-input-binding-in-0"); + isAutoRebalanceEnabled = consumerProperties.isAutoRebalanceEnabled(); + assertThat(isAutoRebalanceEnabled).isFalse(); + } + @Test void kafkaBinderExtendedProperties() throws Exception { @@ -140,7 +153,7 @@ class KafkaBinderExtendedPropertiesTest { assertThat(rebalanceListener.latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(rebalanceListener.bindings.keySet()).contains("standard-in", "custom-in"); - assertThat(rebalanceListener.bindings.values()).containsExactly(Boolean.TRUE, + assertThat(rebalanceListener.bindings.values()).contains(Boolean.TRUE, Boolean.TRUE); } diff --git a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java index 2a545ff55..0a11977db 100644 --- a/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java +++ b/core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binding/BindingsLifecycleController.java @@ -81,26 +81,30 @@ public class BindingsLifecycleController implements ApplicationContextAware { } } - public void defineBinding(String bindingName, boolean isInputBinding) { - BindableFunctionProxyFactory bindingProxyFactory = new BindableFunctionProxyFactory(bindingName, - isInputBinding ? 1 : 0, isInputBinding ? 0 : 1, this.applicationContext.getBean(StreamFunctionProperties.class)); - bindingProxyFactory.setApplicationContext(this.applicationContext); + /** + * Allows to dynamically define a new input binding returning its consumer properties for further customization. + * @param

the type of consumer properties. For example, if binding derives from Kafka, it will return KafkaConsumerProperties. + * @param bindingName the name of the binding. + * @return instance of the consumer properties. + */ + public

P defineInputBinding(String bindingName) { + BindableFunctionProxyFactory bindingProxyFactory = + new BindableFunctionProxyFactory(bindingName, 1, 0, this.applicationContext.getBean(StreamFunctionProperties.class)); + this.defineBinding(bindingProxyFactory); + return this.getExtensionProperties(bindingName + "-in-0"); + } - bindingProxyFactory.afterPropertiesSet(); - BindingService bindingService = this.applicationContext.getBean(BindingService.class); - - - AbstractBindingLifecycle bindingLifecycle; - if (isInputBinding) { - bindingProxyFactory.createAndBindInputs(bindingService); - bindingLifecycle = this.applicationContext.getBean(InputBindingLifecycle.class); - } - else { - bindingProxyFactory.createAndBindOutputs(bindingService); - bindingLifecycle = this.applicationContext.getBean(OutputBindingLifecycle.class); - } - - bindingLifecycle.startBindable(bindingProxyFactory); + /** + * Allows to dynamically define a new input binding returning its producer properties for further customization. + * @param

the type of producer properties. For example, if binding derives from Kafka, it will return KafkaProducerProperties. + * @param bindingName the name of the binding. + * @return instance of the producer properties. + */ + public

P defineOutputBinding(String bindingName) { + BindableFunctionProxyFactory bindingProxyFactory = + new BindableFunctionProxyFactory(bindingName, 0, 1, this.applicationContext.getBean(StreamFunctionProperties.class)); + this.defineBinding(bindingProxyFactory); + return this.getExtensionProperties(bindingName + "-out-0"); } /** @@ -207,6 +211,26 @@ public class BindingsLifecycleController implements ApplicationContextAware { this.applicationContext = applicationContext; } + private void defineBinding(BindableFunctionProxyFactory bindingProxyFactory) { + bindingProxyFactory.setApplicationContext(this.applicationContext); + bindingProxyFactory.afterPropertiesSet(); + + BindingService bindingService = this.applicationContext.getBean(BindingService.class); + + AbstractBindingLifecycle bindingLifecycle; + if (bindingProxyFactory.getInputs().size() > 0) { + bindingProxyFactory.createAndBindInputs(bindingService); + bindingLifecycle = this.applicationContext.getBean(InputBindingLifecycle.class); + } + else { + bindingProxyFactory.createAndBindOutputs(bindingService); + bindingLifecycle = this.applicationContext.getBean(OutputBindingLifecycle.class); + } + + bindingLifecycle.startBindable(bindingProxyFactory); + } + + /** * Queries for all input {@link Binding}s. * @return the list of input {@link Binding}s