diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index e2b1ecab8..d1d5c9593 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -859,7 +859,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } } else { //if (rawType instanceof Class) { // see AWS adapter with WildardTypeImpl and Azure with Voids - convertedValue = this.convertNonMessageInputIfNecessary(type, value); + convertedValue = FunctionTypeUtils.isPublisher(type) + ? this.convertNonMessageInputIfNecessary(rawType, value) + : this.convertNonMessageInputIfNecessary(type, value); } } if (logger.isDebugEnabled()) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 9975d48e6..827115fe5 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -505,6 +506,19 @@ public class BeanFactoryAwareFunctionRegistryTests { assertThat(result).isEqualTo("BIKE"); } + @Test + public void testGH_608() { + ApplicationContext context = new SpringApplicationBuilder(SampleFunctionConfiguration.class) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true"); + FunctionCatalog catalog = context.getBean(FunctionCatalog.class); + + Consumer> consumer = catalog.lookup("reactivePojoConsumer"); + consumer.accept(Flux.just("{\"name\":\"Ricky\"}")); + SampleFunctionConfiguration config = context.getBean(SampleFunctionConfiguration.class); + assertThat(((Person) config.consumerInputRef.get()).getName()).isEqualTo("Ricky"); + } + @EnableAutoConfiguration public static class PojoToMessageFunctionCompositionConfiguration { @@ -660,6 +674,8 @@ public class BeanFactoryAwareFunctionRegistryTests { @Configuration protected static class SampleFunctionConfiguration { + AtomicReference consumerInputRef = new AtomicReference<>(); + @Bean public Function uppercasePerson() { return person -> { @@ -816,6 +832,12 @@ public class BeanFactoryAwareFunctionRegistryTests { public Consumer> reactiveConsumer() { return null; } + + @Bean + // Perhaps it should not be allowed. Recommend Function> + public Consumer> reactivePojoConsumer() { + return flux -> flux.subscribe(v -> consumerInputRef.set(v)); + } } @EnableAutoConfiguration