Fixed a bug where the consumer function could not subscribe to the incoming flow

Resolves #701
Resolves #702
This commit is contained in:
Tsypov Dmitriy
2021-05-21 22:05:22 +07:00
committed by Oleg Zhurakousky
parent c2d0590ec3
commit dc2076c77a
2 changed files with 27 additions and 16 deletions

View File

@@ -167,6 +167,10 @@ public class KotlinLambdaToFunctionAutoConfiguration {
@Override
public void accept(Object input) {
if (CoroutinesUtils.isValidSuspendingFunction(kotlinLambdaTarget, input)) {
CoroutinesUtils.invokeSuspendingConsumer(kotlinLambdaTarget, input);
return;
}
this.apply(input);
}

View File

@@ -34,10 +34,6 @@ import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
*
*/
fun isValidSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Boolean {
return arg0 is Flux<*> && kotlinLambdaTarget is Function2<*, *, *>
}
fun getSuspendingFunctionArgType(type: Type): Type {
return getFlowTypeArguments(type)
}
@@ -93,11 +89,13 @@ private fun getContinuationTypeArguments(type: Type): Type {
fun invokeSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Flux<Any> {
val function = kotlinLambdaTarget as SuspendFunction
val flux = arg0 as Flux<Any>
return fluxSuspendingFlowFunction(flux, function)
}
fun isValidSuspendingSupplier(kotlinLambdaTarget: Any): Boolean {
return kotlinLambdaTarget is Function1<*, *>
return mono(Dispatchers.Unconfined) {
suspendCoroutineUninterceptedOrReturn<Flow<Any>> {
function.invoke(flux.asFlow(), it)
}
}.flatMapMany {
it.asFlux()
}
}
fun invokeSuspendingSupplier(kotlinLambdaTarget: Any): Flux<Any> {
@@ -111,15 +109,24 @@ fun invokeSuspendingSupplier(kotlinLambdaTarget: Any): Flux<Any> {
}
}
fun fluxSuspendingFlowFunction(flux: Flux<Any>, target: SuspendFunction): Flux<Any> {
return mono(Dispatchers.Unconfined) {
suspendCoroutineUninterceptedOrReturn<Flow<Any>> {
target.invoke(flux.asFlow(), it)
fun invokeSuspendingConsumer(kotlinLambdaTarget: Any, arg0: Any) {
val consumer = kotlinLambdaTarget as SuspendConsumer
val flux = arg0 as Flux<Any>
mono(Dispatchers.Unconfined) {
suspendCoroutineUninterceptedOrReturn<Unit> {
consumer.invoke(flux.asFlow(), it)
}
}.flatMapMany {
it.asFlux()
}
}.subscribe()
}
fun isValidSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Boolean {
return arg0 is Flux<*> && kotlinLambdaTarget is Function2<*, *, *>
}
fun isValidSuspendingSupplier(kotlinLambdaTarget: Any): Boolean {
return kotlinLambdaTarget is Function1<*, *>
}
private typealias SuspendFunction = (Any?, Any?) -> Any?
private typealias SuspendConsumer = (Any?, Any?) -> Unit?
private typealias SuspendSupplier = (Any?) -> Any?