From dc2076c77a61e5268ed2c6b756b85b2e91d5d454 Mon Sep 17 00:00:00 2001 From: Tsypov Dmitriy Date: Fri, 21 May 2021 22:05:22 +0700 Subject: [PATCH] Fixed a bug where the consumer function could not subscribe to the incoming flow Resolves #701 Resolves #702 --- ...tlinLambdaToFunctionAutoConfiguration.java | 4 ++ .../context/config/CoroutinesUtils.kt | 39 +++++++++++-------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/KotlinLambdaToFunctionAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/KotlinLambdaToFunctionAutoConfiguration.java index 4f5289f55..18d131af5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/KotlinLambdaToFunctionAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/KotlinLambdaToFunctionAutoConfiguration.java @@ -167,6 +167,10 @@ public class KotlinLambdaToFunctionAutoConfiguration { @Override public void accept(Object input) { + if (CoroutinesUtils.isValidSuspendingFunction(kotlinLambdaTarget, input)) { + CoroutinesUtils.invokeSuspendingConsumer(kotlinLambdaTarget, input); + return; + } this.apply(input); } diff --git a/spring-cloud-function-context/src/main/kotlin/org/springframework/cloud/function/context/config/CoroutinesUtils.kt b/spring-cloud-function-context/src/main/kotlin/org/springframework/cloud/function/context/config/CoroutinesUtils.kt index 0d2318456..8614b69db 100644 --- a/spring-cloud-function-context/src/main/kotlin/org/springframework/cloud/function/context/config/CoroutinesUtils.kt +++ b/spring-cloud-function-context/src/main/kotlin/org/springframework/cloud/function/context/config/CoroutinesUtils.kt @@ -34,10 +34,6 @@ import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn * */ -fun isValidSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Boolean { - return arg0 is Flux<*> && kotlinLambdaTarget is Function2<*, *, *> -} - fun getSuspendingFunctionArgType(type: Type): Type { return getFlowTypeArguments(type) } @@ -93,11 +89,13 @@ private fun getContinuationTypeArguments(type: Type): Type { fun invokeSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Flux { val function = kotlinLambdaTarget as SuspendFunction val flux = arg0 as Flux - return fluxSuspendingFlowFunction(flux, function) -} - -fun isValidSuspendingSupplier(kotlinLambdaTarget: Any): Boolean { - return kotlinLambdaTarget is Function1<*, *> + return mono(Dispatchers.Unconfined) { + suspendCoroutineUninterceptedOrReturn> { + function.invoke(flux.asFlow(), it) + } + }.flatMapMany { + it.asFlux() + } } fun invokeSuspendingSupplier(kotlinLambdaTarget: Any): Flux { @@ -111,15 +109,24 @@ fun invokeSuspendingSupplier(kotlinLambdaTarget: Any): Flux { } } -fun fluxSuspendingFlowFunction(flux: Flux, target: SuspendFunction): Flux { - return mono(Dispatchers.Unconfined) { - suspendCoroutineUninterceptedOrReturn> { - target.invoke(flux.asFlow(), it) +fun invokeSuspendingConsumer(kotlinLambdaTarget: Any, arg0: Any) { + val consumer = kotlinLambdaTarget as SuspendConsumer + val flux = arg0 as Flux + mono(Dispatchers.Unconfined) { + suspendCoroutineUninterceptedOrReturn { + consumer.invoke(flux.asFlow(), it) } - }.flatMapMany { - it.asFlux() - } + }.subscribe() +} + +fun isValidSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Boolean { + return arg0 is Flux<*> && kotlinLambdaTarget is Function2<*, *, *> +} + +fun isValidSuspendingSupplier(kotlinLambdaTarget: Any): Boolean { + return kotlinLambdaTarget is Function1<*, *> } private typealias SuspendFunction = (Any?, Any?) -> Any? +private typealias SuspendConsumer = (Any?, Any?) -> Unit? private typealias SuspendSupplier = (Any?) -> Any?