GH-453 Provide continuation model for reactive consumer

Modified invocation model of the reactive consumer to ensure it provides reasonable continuation path via MonoIgnoreElements (Mono.. . .then())

Resolves #453
This commit is contained in:
Oleg Zhurakousky
2020-03-05 08:56:46 +01:00
parent c85d4f1629
commit 777f0d8afa

View File

@@ -545,7 +545,21 @@ public class BeanFactoryAwareFunctionRegistry
invocationResult = ((Supplier) target).get();
}
else {
((Consumer) this.target).accept(input);
if (input instanceof Flux) {
invocationResult = ((Flux) input).transform(flux -> {
((Consumer) this.target).accept(flux);
return Mono.ignoreElements((Flux) flux);
}).then();
}
else if (input instanceof Mono) {
invocationResult = ((Mono) input).transform(flux -> {
((Consumer) this.target).accept(flux);
return Mono.ignoreElements((Mono) flux);
}).then();
}
else {
((Consumer) this.target).accept(input);
}
}
if (!(this.target instanceof Consumer) && logger.isDebugEnabled()) {
@@ -566,7 +580,6 @@ public class BeanFactoryAwareFunctionRegistry
this.convertInputPublisherIfNecessary((Publisher<?>) input, FunctionTypeUtils.getInputType(this.functionType, 0));
if (FunctionTypeUtils.isReactive(FunctionTypeUtils.getInputType(this.functionType, 0))) {
result = this.invokeFunction(input);
result = result == null ? Mono.empty() : result;
}
else {
if (this.composed) {