From b8856bf0e7c627ccd152186be3d42d0912b085ff Mon Sep 17 00:00:00 2001 From: Roman S Samarev Date: Sun, 13 Feb 2022 14:38:14 +0300 Subject: [PATCH] SimpleFunctionRegistry: Fixed: compose of supplier...consumer pipeline produces a supplier type. This fix allows testing of composed pipelines without input and output SimpleFunctionRegistry: added info. No functional changes Resolves #809 --- .../catalog/SimpleFunctionRegistry.java | 7 ++-- .../catalog/SimpleFunctionRegistryTests.java | 38 +++++++++++++++++++ 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index a4b8be79e..2b6cc4ceb 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -80,6 +80,7 @@ import org.springframework.util.StringUtils; * such as type conversion, composition, POJO etc. * * @author Oleg Zhurakousky + * @author Roman Samarev * */ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspector { @@ -625,9 +626,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect Type composedFunctionType; if (afterWrapper.outputType == null) { - composedFunctionType = ResolvableType.forClassWithGenerics(Consumer.class, this.inputType == null - ? null - : ResolvableType.forType(this.inputType)).getType(); + composedFunctionType = (this.inputType == null) ? + ResolvableType.forClassWithGenerics(Supplier.class, ResolvableType.forType(Object.class)).getType() : + ResolvableType.forClassWithGenerics(Consumer.class, ResolvableType.forType(this.inputType)).getType(); } else if (this.inputType == null && afterWrapper.outputType != null) { ResolvableType composedOutputType; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index 062b2d7fa..9cdc5eb79 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; @@ -527,6 +529,30 @@ public class SimpleFunctionRegistryTests { assertThat(FunctionTypeUtils.isMono(function.getOutputType())); } + @Test + public void testFunctionCompositionWithReactiveSupplierAndConsumer() { + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter, + new JacksonMapper(new ObjectMapper())); + + Object reactiveFunc = reactiveFluxSupplier(); + FunctionRegistration functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxSupplier") + .type(ResolvableType.forClassWithGenerics( + Supplier.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType()); + catalog.register(functionRegistration); + + reactiveFunc = reactiveFluxConsumer(); + functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxConsumer") + .type(ResolvableType.forClassWithGenerics( + Consumer.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType()); + catalog.register(functionRegistration); + + FunctionInvocationWrapper lookedUpFunction = catalog + .lookup("reactiveFluxSupplier|reactiveFluxConsumer"); + + assertThat(lookedUpFunction).isNotNull(); + lookedUpFunction.apply(null); + assertThat(consumerDowncounter.get()).isZero(); + } public Function uppercase() { return v -> v.toUpperCase(); @@ -551,6 +577,18 @@ public class SimpleFunctionRegistryTests { }); } + private final AtomicInteger consumerDowncounter = new AtomicInteger(10); + + public Supplier> reactiveFluxSupplier() { + return () -> Flux.fromStream( + IntStream.range(0, consumerDowncounter.get()).boxed().map(i -> Integer.toString(i)) + ); + } + + public Consumer> reactiveFluxConsumer() { + return flux -> flux.subscribe(v -> consumerDowncounter.decrementAndGet()); + } + private FunctionCatalog configureCatalog(Class... configClass) { ApplicationContext context = new SpringApplicationBuilder(configClass) .run("--logging.level.org.springframework.cloud.function=DEBUG",