From 777f0d8afa2053d3d9a3d6cf13d016d4ff903e0c Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 5 Mar 2020 08:56:46 +0100 Subject: [PATCH] GH-453 Provide continuation model for reactive consumer Modified invocation model of the reactive consumer to ensure it provides reasonable continuation path via MonoIgnoreElements (Mono.. . .then()) Resolves #453 --- .../BeanFactoryAwareFunctionRegistry.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java index 4f6f284ae..940a632b2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistry.java @@ -545,7 +545,21 @@ public class BeanFactoryAwareFunctionRegistry invocationResult = ((Supplier) target).get(); } else { - ((Consumer) this.target).accept(input); + if (input instanceof Flux) { + invocationResult = ((Flux) input).transform(flux -> { + ((Consumer) this.target).accept(flux); + return Mono.ignoreElements((Flux) flux); + }).then(); + } + else if (input instanceof Mono) { + invocationResult = ((Mono) input).transform(flux -> { + ((Consumer) this.target).accept(flux); + return Mono.ignoreElements((Mono) flux); + }).then(); + } + else { + ((Consumer) this.target).accept(input); + } } if (!(this.target instanceof Consumer) && logger.isDebugEnabled()) { @@ -566,7 +580,6 @@ public class BeanFactoryAwareFunctionRegistry this.convertInputPublisherIfNecessary((Publisher) input, FunctionTypeUtils.getInputType(this.functionType, 0)); if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(this.functionType, 0))) { result = this.invokeFunction(input); - result = result == null ? Mono.empty() : result; } else { if (this.composed) {