Convert function composer to work with registrations

This commit is contained in:
Dave Syer
2019-02-22 08:28:13 +00:00
parent 0189c578ef
commit 616bb1f685

View File

@@ -310,17 +310,14 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
else {
String[] stages = StringUtils.delimitedListToStringArray(name, "|");
if (Stream.of(stages).allMatch(funcName -> contains(funcName))) {
List<Object> composableFunctions = Stream.of(stages)
List<FunctionRegistration<?>> composableFunctions = Stream.of(stages)
.map(funcName -> find(funcName)).collect(Collectors.toList());
composedFunction = composableFunctions.stream()
.reduce((a, z) -> composeFunctions(a, z))
FunctionRegistration<?> composedRegistration = composableFunctions
.stream().reduce((a, z) -> composeFunctions(a, z))
.orElseGet(() -> null);
if (composedFunction != null && !this.types.containsKey(name)
&& this.types.containsKey(stages[0])
&& this.types.containsKey(stages[stages.length - 1])) {
FunctionType input = this.types.get(stages[0]);
FunctionType output = this.types.get(stages[stages.length - 1]);
this.addType(name, FunctionType.compose(input, output));
composedFunction = composedRegistration.getTarget();
if (composedFunction != null && !this.types.containsKey(name)) {
this.addType(name, composedRegistration.getType());
this.addName(composedFunction, name);
if (composedFunction instanceof Function) {
this.addFunction(name, (Function<?, ?>) composedFunction);
@@ -347,7 +344,7 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
|| getConsumerNames().contains(name);
}
private Object find(String name) {
private FunctionRegistration<?> find(String name) {
Object result = this.suppliers.get(name);
if (result == null) {
result = this.functions.get(name);
@@ -355,17 +352,23 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
if (result == null) {
result = this.consumers.get(name);
}
return result;
return getRegistration(result);
}
@SuppressWarnings("unchecked")
private Object composeFunctions(Object a, Object b) {
private FunctionRegistration<?> composeFunctions(FunctionRegistration<?> aReg,
FunctionRegistration<?> bReg) {
FunctionType aType = aReg.getType();
FunctionType bType = bReg.getType();
Object a = aReg.getTarget();
Object b = bReg.getTarget();
Object composedFunction = null;
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
return (Supplier<Mono<Void>>) () -> Mono.from(
composedFunction = (Supplier<Mono<Void>>) () -> Mono.from(
supplier.get().compose(v -> fConsumer.apply(supplier.get())));
}
else {
@@ -376,7 +379,8 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
return (Supplier<Object>) () -> function.apply(supplier.get());
composedFunction = (Supplier<Object>) () -> function
.apply(supplier.get());
}
}
else if (a instanceof Function && b instanceof Function) {
@@ -384,7 +388,7 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
Function<Object, Object> function2 = (Function<Object, Object>) b;
if (function1 instanceof FluxToMonoFunction) {
if (function2 instanceof MonoToFluxFunction) {
return function1.andThen(function2);
composedFunction = function1.andThen(function2);
}
else {
throw new IllegalStateException(
@@ -393,23 +397,27 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
}
else if (function2 instanceof FluxToMonoFunction) {
return new FluxToMonoFunction<Object, Object>(
composedFunction = new FluxToMonoFunction<Object, Object>(
((Function<Flux<Object>, Flux<Object>>) a).andThen(
((FluxToMonoFunction<Object, Object>) b).getTarget()));
}
else {
return function1.andThen(function2);
composedFunction = function1.andThen(function2);
}
}
else if (a instanceof Function && b instanceof Consumer) {
Function<Object, Object> function = (Function<Object, Object>) a;
Consumer<Object> consumer = (Consumer<Object>) b;
return (Consumer<Object>) v -> consumer.accept(function.apply(v));
composedFunction = (Consumer<Object>) v -> consumer.accept(function.apply(v));
}
else {
throw new IllegalArgumentException(String
.format("Could not compose %s and %s", a.getClass(), b.getClass()));
}
String name = aReg.getNames().iterator().next() + "|"
+ bReg.getNames().iterator().next();
return new FunctionRegistration<>(composedFunction, name)
.type(FunctionType.compose(aType, bType));
}
@SuppressWarnings("unchecked")