diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index af98cad1d..fc2b56d8b 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -1088,6 +1088,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object convertInputPublisherIfNecessary(Publisher publisher, Type type) { + if (FunctionTypeUtils.isMono(type) && publisher instanceof Flux) { + publisher = Mono.from(publisher); + } + else if (FunctionTypeUtils.isFlux(type) && publisher instanceof Mono) { + publisher = Flux.from(publisher); + } Type actualType = type != null ? FunctionTypeUtils.getGenericType(type) : type; return publisher instanceof Mono ? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType)) diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 925f13e85..b4a8b5646 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -33,6 +33,7 @@ import java.util.stream.Collectors; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.util.function.Tuple2; @@ -530,6 +531,17 @@ public class BeanFactoryAwareFunctionRegistryTests { assertThat(((Person) config.consumerInputRef.get()).getName()).isEqualTo("Ricky"); } + @Test + public void testGH_609() { + FunctionCatalog catalog = this.configureCatalog(SampleFunctionConfiguration.class); + Function, Publisher> f = catalog.lookup("monoToMono"); + Mono result = (Mono) f.apply(Mono.just("hello")); + assertThat(result.block()).isEqualTo("hello"); + + result = (Mono) f.apply(Flux.just("hello")); + assertThat(result.block()).isEqualTo("hello"); + } + @EnableAutoConfiguration public static class PojoToMessageFunctionCompositionConfiguration {