From 18677ca47c845e11ad2826fe9dee0ed900cca462 Mon Sep 17 00:00:00 2001 From: Roman S Samarev Date: Sun, 13 Feb 2022 14:38:14 +0300 Subject: [PATCH] SimpleFunctionRegistry: Fixed: compose of supplier...consumer pipeline produces a supplier type. This fix allows testing of composed pipelines without input and output SimpleFunctionRegistry: added info. No functional changes Resolves #809 --- .../catalog/SimpleFunctionRegistry.java | 7 ++-- .../catalog/SimpleFunctionRegistryTests.java | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 8a4fb73f9..1bfa88fd5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -80,6 +80,7 @@ import org.springframework.util.StringUtils; * such as type conversion, composition, POJO etc. * * @author Oleg Zhurakousky + * @author Roman Samarev * */ public class SimpleFunctionRegistry implements FunctionRegistry { @@ -618,9 +619,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry { Type composedFunctionType; if (afterWrapper.outputType == null) { - composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class, this.inputType == null - ? null - : ResolvableType.forType(this.inputType)).getType(); + composedFunctionType = (this.inputType == null) ? + ResolvableType.forClassWithGenerics(Supplier.class, ResolvableType.forType(Object.class)).getType() : + ResolvableType.forClassWithGenerics(Consumer.class, ResolvableType.forType(this.inputType)).getType(); } else if (this.inputType == null && afterWrapper.outputType != null) { ResolvableType composedOutputType; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index c570c33e8..67a2b4ff0 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; @@ -520,6 +522,30 @@ public class SimpleFunctionRegistryTests { assertThat(FunctionTypeUtils.isMono(function.getOutputType())); } + @Test + public void testFunctionCompositionWithReactiveSupplierAndConsumer() { + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter, + new JacksonMapper(new ObjectMapper())); + + Object reactiveFunc = reactiveFluxSupplier(); + FunctionRegistration functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxSupplier") + .type(ResolvableType.forClassWithGenerics( + Supplier.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType()); + catalog.register(functionRegistration); + + reactiveFunc = reactiveFluxConsumer(); + functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxConsumer") + .type(ResolvableType.forClassWithGenerics( + Consumer.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType()); + catalog.register(functionRegistration); + + FunctionInvocationWrapper lookedUpFunction = catalog + .lookup("reactiveFluxSupplier|reactiveFluxConsumer"); + + assertThat(lookedUpFunction).isNotNull(); + lookedUpFunction.apply(null); + assertThat(consumerDowncounter.get()).isZero(); + } public Function uppercase() { return v -> v.toUpperCase(); @@ -544,6 +570,18 @@ public class SimpleFunctionRegistryTests { }); } + private final AtomicInteger consumerDowncounter = new AtomicInteger(10); + + public Supplier> reactiveFluxSupplier() { + return () -> Flux.fromStream( + IntStream.range(0, consumerDowncounter.get()).boxed().map(i -> Integer.toString(i)) + ); + } + + public Consumer> reactiveFluxConsumer() { + return flux -> flux.subscribe(v -> consumerDowncounter.decrementAndGet()); + } + private FunctionCatalog configureCatalog(Class... configClass) { ApplicationContext context = new SpringApplicationBuilder(configClass) .run("--logging.level.org.springframework.cloud.function=DEBUG",