Removed duplication of fluxification logic
- Removed duplication of fluxification logic in FunctionRegistration and ContextFunctionCatalog. - Polished FunctionRegistration.wrap logic - Added initial javadoc
This commit is contained in:
@@ -34,6 +34,8 @@ import org.springframework.beans.factory.BeanNameAware;
|
||||
import org.springframework.cloud.function.core.FluxConsumer;
|
||||
import org.springframework.cloud.function.core.FluxFunction;
|
||||
import org.springframework.cloud.function.core.FluxSupplier;
|
||||
import org.springframework.cloud.function.core.FluxToMonoFunction;
|
||||
import org.springframework.cloud.function.core.MonoToFluxFunction;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
@@ -132,35 +134,48 @@ public class FunctionRegistration<T> implements BeanNameAware {
|
||||
return this.names(Arrays.asList(names));
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms (wraps) function identified by the 'target' to its {@code Flux}
|
||||
* equivalent unless it already is. For example, {@code Function<String, String>}
|
||||
* becomes {@code Function<Flux<String>, Flux<String>>}
|
||||
* @param <S> the expected target type of the function (e.g., FluxFunction)
|
||||
* @return {@code FunctionRegistration} with the appropriately wrapped target.
|
||||
*
|
||||
*/
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public <S> FunctionRegistration<S> wrap() {
|
||||
if (this.type == null || this.type.isWrapper()) {
|
||||
@SuppressWarnings("unchecked")
|
||||
FunctionRegistration<S> value = (FunctionRegistration<S>) this;
|
||||
return value;
|
||||
FunctionRegistration<S> result;
|
||||
if (this.type == null) {
|
||||
result = (FunctionRegistration<S>) this;
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
S target = (S) this.target;
|
||||
FunctionRegistration<S> result = new FunctionRegistration<S>(target);
|
||||
result.type(this.type.getType());
|
||||
if (target instanceof Function) {
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
S wrapped = (S) new FluxFunction((Function) target);
|
||||
target = wrapped;
|
||||
result.type = result.type.wrap(Flux.class);
|
||||
else {
|
||||
S target = (S) this.target;
|
||||
result = new FunctionRegistration<S>(target);
|
||||
result.type(this.type.getType());
|
||||
boolean flux = type.isWrapper();
|
||||
if (!flux) {
|
||||
if (target instanceof Function) {
|
||||
target = (S) new FluxFunction((Function<?, ?>) target);
|
||||
}
|
||||
else if (target instanceof Supplier) {
|
||||
target = (S) new FluxSupplier((Supplier<?>) target);
|
||||
}
|
||||
else if (target instanceof Consumer) {
|
||||
target = (S) new FluxConsumer((Consumer<?>) target);
|
||||
}
|
||||
}
|
||||
|
||||
if (Mono.class.isAssignableFrom(type.getOutputWrapper())) {
|
||||
target = (S) new FluxToMonoFunction((Function) target);
|
||||
}
|
||||
else if (Mono.class.isAssignableFrom(type.getInputWrapper())) {
|
||||
target = (S) new MonoToFluxFunction((Function) target);
|
||||
}
|
||||
result = result.target(target).names(this.names)
|
||||
.type(result.type.wrap(Flux.class)).properties(this.properties);
|
||||
}
|
||||
else if (target instanceof Supplier) {
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
S wrapped = (S) new FluxSupplier((Supplier) target);
|
||||
target = wrapped;
|
||||
result.type = result.type.wrap(Flux.class);
|
||||
}
|
||||
else if (target instanceof Consumer) {
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
S wrapped = (S) new FluxConsumer((Consumer) target);
|
||||
target = wrapped;
|
||||
result.type = result.type.wrap(Flux.class, Mono.class);
|
||||
}
|
||||
return result.target(target).names(this.names).properties(this.properties);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -62,7 +62,6 @@ import org.springframework.cloud.function.context.catalog.FunctionInspector;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent;
|
||||
import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent;
|
||||
import org.springframework.cloud.function.core.FluxConsumer;
|
||||
import org.springframework.cloud.function.core.FluxFunction;
|
||||
import org.springframework.cloud.function.core.FluxSupplier;
|
||||
import org.springframework.cloud.function.core.FluxToMonoFunction;
|
||||
import org.springframework.cloud.function.core.IsolatedConsumer;
|
||||
@@ -420,7 +419,7 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
registration.type(findType(target).getType());
|
||||
}
|
||||
Class<?> type;
|
||||
registration = transform(registration);
|
||||
registration = isolated(registration).wrap();
|
||||
target = registration.getTarget();
|
||||
if (target instanceof Supplier) {
|
||||
type = Supplier.class;
|
||||
@@ -450,41 +449,6 @@ public class ContextFunctionCatalogAutoConfiguration {
|
||||
}
|
||||
}
|
||||
|
||||
private FunctionRegistration<?> transform(FunctionRegistration<?> registration) {
|
||||
return fluxify(isolated(registration));
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private FunctionRegistration<?> fluxify(FunctionRegistration<?> input) {
|
||||
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
|
||||
Object target = registration.getTarget();
|
||||
FunctionType type = registration.getType();
|
||||
boolean flux = hasFluxTypes(type);
|
||||
if (!flux) {
|
||||
if (target instanceof Supplier<?>) {
|
||||
target = new FluxSupplier((Supplier<?>) target);
|
||||
}
|
||||
else if (target instanceof Function<?, ?>) {
|
||||
target = new FluxFunction((Function<?, ?>) target);
|
||||
}
|
||||
else if (target instanceof Consumer<?>) {
|
||||
target = new FluxConsumer((Consumer<?>) target);
|
||||
}
|
||||
registration.target(target);
|
||||
}
|
||||
if (Mono.class.isAssignableFrom(type.getOutputWrapper())) {
|
||||
registration.target(new FluxToMonoFunction<>((Function) target));
|
||||
}
|
||||
else if (Mono.class.isAssignableFrom(type.getInputWrapper())) {
|
||||
registration.target(new MonoToFluxFunction<>((Function) target));
|
||||
}
|
||||
return registration;
|
||||
}
|
||||
|
||||
private boolean hasFluxTypes(FunctionType type) {
|
||||
return type.isWrapper();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private FunctionRegistration<?> isolated(FunctionRegistration<?> input) {
|
||||
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
|
||||
|
||||
Reference in New Issue
Block a user