diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index 484f36011..9ec23ed87 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -36,6 +36,7 @@ import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; import org.springframework.cloud.function.core.FluxedConsumer; +import org.springframework.cloud.function.core.FluxedFunction; import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -171,7 +172,7 @@ public class FunctionRegistration implements BeanNameAware { target = (S) new FluxedConsumer((Consumer) target); } else if (target instanceof Function) { - // target = (S) new FluxedFunction((Function) target); + target = (S) new FluxedFunction((Function) target); } result = result.target(target).names(this.names) diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java index cfff5d9fa..ed02e923f 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxToMonoFunction.java @@ -22,7 +22,12 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Marker wrapper for target {@code Function, Mono>}. + * Wrapper to mark function {@code Function, Mono>}. + * + * While it may look similar to {@link FluxedConsumer} + * the fundamental difference is that this class represents a function that + * returns {@link Mono} of type {@code }, while {@link FluxedConsumer} is + * a consumer that has been decorated as {@code Function, Mono>}. * * @param type of {@link Flux} input of the target function * @param type of {@link Mono} output of the target function @@ -30,25 +35,15 @@ import reactor.core.publisher.Mono; * @since 2.0 */ public class FluxToMonoFunction - implements Function, Mono>, FluxWrapper, Mono>> { + extends WrappedFunction, Mono, Function, Mono>> { - private final Function, Mono> function; - - /** - * @param function target function - */ - public FluxToMonoFunction(Function, Mono> function) { - this.function = function; - } - - @Override - public Function, Mono> getTarget() { - return this.function; + public FluxToMonoFunction(Function, Mono> target) { + super(target); } @Override public Mono apply(Flux input) { - return this.function.apply(input); + return this.getTarget().apply(input); } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java index 1710873d7..48cbfe30b 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/MonoToFluxFunction.java @@ -30,25 +30,15 @@ import reactor.core.publisher.Mono; * @since 2.0 */ public class MonoToFluxFunction - implements Function, Flux>, FluxWrapper, Flux>> { + extends WrappedFunction, Flux, Function, Flux>> { - private final Function, Flux> function; - - /** - * @param function target function - */ - public MonoToFluxFunction(Function, Flux> function) { - this.function = function; - } - - @Override - public Function, Flux> getTarget() { - return this.function; + public MonoToFluxFunction(Function, Flux> target) { + super(target); } @Override public Flux apply(Mono input) { - return this.function.apply(input); + return this.getTarget().apply(input); } }