diff --git a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java index 04178e058..e5205d9a3 100644 --- a/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java +++ b/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java @@ -185,6 +185,9 @@ public class FunctionConfiguration { if (functionWrapper != null && functionWrapper.isSupplier()) { // gather output content types List contentTypes = new ArrayList(); + if (proxyFactory.getOutputs().size() == 0) { + return; + } Assert.isTrue(proxyFactory.getOutputs().size() == 1, "Supplier with multiple outputs is not supported at the moment."); String outputName = proxyFactory.getOutputs().iterator().next(); @@ -552,11 +555,13 @@ public class FunctionConfiguration { } else { String outputDestinationName = this.determineOutputDestinationName(0, bindableProxyFactory, functionType); - String inputDestinationName = inputBindingNames.iterator().next(); - Object inputDestination = this.applicationContext.getBean(inputDestinationName); - if (inputDestination != null && inputDestination instanceof SubscribableChannel) { - AbstractMessageHandler handler = createFunctionHandler(function, inputDestinationName, outputDestinationName); - ((SubscribableChannel) inputDestination).subscribe(handler); + if (!ObjectUtils.isEmpty(inputBindingNames)) { + String inputDestinationName = inputBindingNames.iterator().next(); + Object inputDestination = this.applicationContext.getBean(inputDestinationName); + if (inputDestination != null && inputDestination instanceof SubscribableChannel) { + AbstractMessageHandler handler = createFunctionHandler(function, inputDestinationName, outputDestinationName); + ((SubscribableChannel) inputDestination).subscribe(handler); + } } } } @@ -866,13 +871,14 @@ public class FunctionConfiguration { FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(inputBindingName); if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229 + sourceFunc.isSupplier() || (!sourceFunc.getFunctionDefinition().equals(inputBindingName) && applicationContext.containsBean(inputBindingName))) { RootBeanDefinition functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(inputBindingName); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(1); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(0); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties); - registry.registerBeanDefinition(inputBindingName + "_binding", functionBindableProxyDefinition); + registry.registerBeanDefinition(inputBindingName + "_binding_in", functionBindableProxyDefinition); } } @@ -880,13 +886,14 @@ public class FunctionConfiguration { FunctionInvocationWrapper sourceFunc = functionCatalog.lookup(outputBindingName); if (sourceFunc == null || //see https://github.com/spring-cloud/spring-cloud-stream/issues/2229 + sourceFunc.isConsumer() || (!sourceFunc.getFunctionDefinition().equals(outputBindingName) && applicationContext.containsBean(outputBindingName))) { RootBeanDefinition functionBindableProxyDefinition = new RootBeanDefinition(BindableFunctionProxyFactory.class); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(outputBindingName); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(0); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(1); functionBindableProxyDefinition.getConstructorArgumentValues().addGenericArgumentValue(this.streamFunctionProperties); - registry.registerBeanDefinition(outputBindingName + "_binding", functionBindableProxyDefinition); + registry.registerBeanDefinition(outputBindingName + "_binding_out", functionBindableProxyDefinition); } } diff --git a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/ExplicitBindingTests.java b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/ExplicitBindingTests.java index 5ab5f9bd5..8d3788844 100644 --- a/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/ExplicitBindingTests.java +++ b/spring-cloud-stream/src/test/java/org/springframework/cloud/stream/binding/ExplicitBindingTests.java @@ -16,6 +16,9 @@ package org.springframework.cloud.stream.binding; +import java.util.function.Consumer; +import java.util.function.Supplier; + import org.junit.jupiter.api.Test; import org.springframework.boot.WebApplicationType; @@ -23,6 +26,7 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.MessageChannel; @@ -50,9 +54,55 @@ public class ExplicitBindingTests { } } + @Test + public void testExplicitBindingsWithExistingConsumer() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(ConsumerConfiguration.class)) + .web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false", + "--spring.cloud.stream.output-bindings=consume")) { + + assertThat(context.getBean("consume-in-0", MessageChannel.class)).isNotNull(); + assertThat(context.getBean("consume-out-0", MessageChannel.class)).isNotNull(); + } + } + + @Test + public void testExplicitBindingsWithExistingSupplier() { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder( + TestChannelBinderConfiguration.getCompleteConfiguration(SupplierConfiguration.class)) + .web(WebApplicationType.NONE) + .run("--spring.jmx.enabled=false", + "--spring.cloud.stream.input-bindings=supply")) { + + assertThat(context.getBean("supply-in-0", MessageChannel.class)).isNotNull(); + assertThat(context.getBean("supply-out-0", MessageChannel.class)).isNotNull(); + } + } + @EnableAutoConfiguration @Configuration public static class EmptyConfiguration { } + + @EnableAutoConfiguration + @Configuration + public static class ConsumerConfiguration { + + @Bean + public Consumer consume() { + return System.out::println; + } + } + + @EnableAutoConfiguration + @Configuration + public static class SupplierConfiguration { + + @Bean + public Supplier supply() { + return () -> "hello"; + } + } }