diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java index 0b5394c0c..c75e40df6 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionRegistration.java @@ -34,6 +34,8 @@ import org.springframework.beans.factory.BeanNameAware; import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; +import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -132,35 +134,48 @@ public class FunctionRegistration implements BeanNameAware { return this.names(Arrays.asList(names)); } + /** + * Transforms (wraps) function identified by the 'target' to its {@code Flux} + * equivalent unless it already is. For example, {@code Function} + * becomes {@code Function, Flux>} + * @param the expected target type of the function (e.g., FluxFunction) + * @return {@code FunctionRegistration} with the appropriately wrapped target. + * + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) public FunctionRegistration wrap() { - if (this.type == null || this.type.isWrapper()) { - @SuppressWarnings("unchecked") - FunctionRegistration value = (FunctionRegistration) this; - return value; + FunctionRegistration result; + if (this.type == null) { + result = (FunctionRegistration) this; } - @SuppressWarnings("unchecked") - S target = (S) this.target; - FunctionRegistration result = new FunctionRegistration(target); - result.type(this.type.getType()); - if (target instanceof Function) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - S wrapped = (S) new FluxFunction((Function) target); - target = wrapped; - result.type = result.type.wrap(Flux.class); + else { + S target = (S) this.target; + result = new FunctionRegistration(target); + result.type(this.type.getType()); + boolean flux = type.isWrapper(); + if (!flux) { + if (target instanceof Function) { + target = (S) new FluxFunction((Function) target); + } + else if (target instanceof Supplier) { + target = (S) new FluxSupplier((Supplier) target); + } + else if (target instanceof Consumer) { + target = (S) new FluxConsumer((Consumer) target); + } + } + + if (Mono.class.isAssignableFrom(type.getOutputWrapper())) { + target = (S) new FluxToMonoFunction((Function) target); + } + else if (Mono.class.isAssignableFrom(type.getInputWrapper())) { + target = (S) new MonoToFluxFunction((Function) target); + } + result = result.target(target).names(this.names) + .type(result.type.wrap(Flux.class)).properties(this.properties); } - else if (target instanceof Supplier) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - S wrapped = (S) new FluxSupplier((Supplier) target); - target = wrapped; - result.type = result.type.wrap(Flux.class); - } - else if (target instanceof Consumer) { - @SuppressWarnings({ "unchecked", "rawtypes" }) - S wrapped = (S) new FluxConsumer((Consumer) target); - target = wrapped; - result.type = result.type.wrap(Flux.class, Mono.class); - } - return result.target(target).names(this.names).properties(this.properties); + + return result; } @Override diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index 021a858e3..d6987f3cc 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -62,7 +62,6 @@ import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent; import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent; import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; import org.springframework.cloud.function.core.IsolatedConsumer; @@ -420,7 +419,7 @@ public class ContextFunctionCatalogAutoConfiguration { registration.type(findType(target).getType()); } Class type; - registration = transform(registration); + registration = isolated(registration).wrap(); target = registration.getTarget(); if (target instanceof Supplier) { type = Supplier.class; @@ -450,41 +449,6 @@ public class ContextFunctionCatalogAutoConfiguration { } } - private FunctionRegistration transform(FunctionRegistration registration) { - return fluxify(isolated(registration)); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private FunctionRegistration fluxify(FunctionRegistration input) { - FunctionRegistration registration = (FunctionRegistration) input; - Object target = registration.getTarget(); - FunctionType type = registration.getType(); - boolean flux = hasFluxTypes(type); - if (!flux) { - if (target instanceof Supplier) { - target = new FluxSupplier((Supplier) target); - } - else if (target instanceof Function) { - target = new FluxFunction((Function) target); - } - else if (target instanceof Consumer) { - target = new FluxConsumer((Consumer) target); - } - registration.target(target); - } - if (Mono.class.isAssignableFrom(type.getOutputWrapper())) { - registration.target(new FluxToMonoFunction<>((Function) target)); - } - else if (Mono.class.isAssignableFrom(type.getInputWrapper())) { - registration.target(new MonoToFluxFunction<>((Function) target)); - } - return registration; - } - - private boolean hasFluxTypes(FunctionType type) { - return type.isWrapper(); - } - @SuppressWarnings({ "rawtypes", "unchecked" }) private FunctionRegistration isolated(FunctionRegistration input) { FunctionRegistration registration = (FunctionRegistration) input;