GH-2265 Fix support for creating explicit bindings
This commit ensures that in the event there is a Consumer or Supplier the output-bidning or input-binding is still created Resolves #2265
This commit is contained in:
@@ -185,6 +185,9 @@ public class FunctionConfiguration {
|
||||
if (functionWrapper != null && functionWrapper.isSupplier()) {
|
||||
// gather output content types
|
||||
List<String> contentTypes = new ArrayList<String>();
|
||||
if (proxyFactory.getOutputs().size() == 0) {
|
||||
return;
|
||||
}
|
||||
Assert.isTrue(proxyFactory.getOutputs().size() == 1, "Supplier with multiple outputs is not supported at the moment.");
|
||||
String outputName = proxyFactory.getOutputs().iterator().next();
|
||||
|
||||
@@ -552,11 +555,13 @@ public class FunctionConfiguration {
|
||||
}
|
||||
else {
|
||||
String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, functionType);
|
||||
String inputDestinationName = inputBindingNames.iterator().next();
|
||||
Object inputDestination = this.applicationContext.getBean(inputDestinationName);
|
||||
if (inputDestination != null && inputDestination instanceof SubscribableChannel) {
|
||||
AbstractMessageHandler handler = createFunctionHandler(function, inputDestinationName, outputDestinationName);
|
||||
((SubscribableChannel) inputDestination).subscribe(handler);
|
||||
if (!ObjectUtils.isEmpty(inputBindingNames)) {
|
||||
String inputDestinationName = inputBindingNames.iterator().next();
|
||||
Object inputDestination = this.applicationContext.getBean(inputDestinationName);
|
||||
if (inputDestination != null && inputDestination instanceof SubscribableChannel) {
|
||||
AbstractMessageHandler handler = createFunctionHandler(function, inputDestinationName, outputDestinationName);
|
||||
((SubscribableChannel) inputDestination).subscribe(handler);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -866,13 +871,14 @@ public class FunctionConfiguration {
|
||||
FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(inputBindingName);
|
||||
|
||||
if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229
|
||||
sourceFunc.isSupplier() ||
|
||||
(!sourceFunc.getFunctionDefinition().equals(inputBindingName) && applicationContext.containsBean(inputBindingName))) {
|
||||
RootBeanDefinition functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(inputBindingName);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(1);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(0);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
|
||||
registry.registerBeanDefinition(inputBindingName + "_binding", functionBindableProxyDefinition);
|
||||
registry.registerBeanDefinition(inputBindingName + "_binding_in", functionBindableProxyDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -880,13 +886,14 @@ public class FunctionConfiguration {
|
||||
FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(outputBindingName);
|
||||
|
||||
if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229
|
||||
sourceFunc.isConsumer() ||
|
||||
(!sourceFunc.getFunctionDefinition().equals(outputBindingName) && applicationContext.containsBean(outputBindingName))) {
|
||||
RootBeanDefinition functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(outputBindingName);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(0);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(1);
|
||||
functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties);
|
||||
registry.registerBeanDefinition(outputBindingName + "_binding", functionBindableProxyDefinition);
|
||||
registry.registerBeanDefinition(outputBindingName + "_binding_out", functionBindableProxyDefinition);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,9 @@
|
||||
|
||||
package org.springframework.cloud.stream.binding;
|
||||
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
@@ -23,6 +26,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
@@ -50,9 +54,55 @@ public class ExplicitBindingTests {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplicitBindingsWithExistingConsumer() {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
TestChannelBinderConfiguration.getCompleteConfiguration(ConsumerConfiguration.class))
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.output-bindings=consume")) {
|
||||
|
||||
assertThat(context.getBean("consume-in-0", MessageChannel.class)).isNotNull();
|
||||
assertThat(context.getBean("consume-out-0", MessageChannel.class)).isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExplicitBindingsWithExistingSupplier() {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
TestChannelBinderConfiguration.getCompleteConfiguration(SupplierConfiguration.class))
|
||||
.web(WebApplicationType.NONE)
|
||||
.run("--spring.jmx.enabled=false",
|
||||
"--spring.cloud.stream.input-bindings=supply")) {
|
||||
|
||||
assertThat(context.getBean("supply-in-0", MessageChannel.class)).isNotNull();
|
||||
assertThat(context.getBean("supply-out-0", MessageChannel.class)).isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class EmptyConfiguration {
|
||||
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class ConsumerConfiguration {
|
||||
|
||||
@Bean
|
||||
public Consumer<String> consume() {
|
||||
return System.out::println;
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class SupplierConfiguration {
|
||||
|
||||
@Bean
|
||||
public Supplier<String> supply() {
|
||||
return () -> "hello";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user