diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index 784935aa7..84e8bd38e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -310,17 +310,14 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi else { String[] stages = StringUtils.delimitedListToStringArray(name, "|"); if (Stream.of(stages).allMatch(funcName -> contains(funcName))) { - List composableFunctions = Stream.of(stages) + List> composableFunctions = Stream.of(stages) .map(funcName -> find(funcName)).collect(Collectors.toList()); - composedFunction = composableFunctions.stream() - .reduce((a, z) -> composeFunctions(a, z)) + FunctionRegistration composedRegistration = composableFunctions + .stream().reduce((a, z) -> composeFunctions(a, z)) .orElseGet(() -> null); - if (composedFunction != null && !this.types.containsKey(name) - && this.types.containsKey(stages[0]) - && this.types.containsKey(stages[stages.length - 1])) { - FunctionType input = this.types.get(stages[0]); - FunctionType output = this.types.get(stages[stages.length - 1]); - this.addType(name, FunctionType.compose(input, output)); + composedFunction = composedRegistration.getTarget(); + if (composedFunction != null && !this.types.containsKey(name)) { + this.addType(name, composedRegistration.getType()); this.addName(composedFunction, name); if (composedFunction instanceof Function) { this.addFunction(name, (Function) composedFunction); @@ -347,7 +344,7 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi || getConsumerNames().contains(name); } - private Object find(String name) { + private FunctionRegistration find(String name) { Object result = this.suppliers.get(name); if (result == null) { result = this.functions.get(name); @@ -355,17 +352,23 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi if (result == null) { result = this.consumers.get(name); } - return result; + return getRegistration(result); } @SuppressWarnings("unchecked") - private Object composeFunctions(Object a, Object b) { + private FunctionRegistration composeFunctions(FunctionRegistration aReg, + FunctionRegistration bReg) { + FunctionType aType = aReg.getType(); + FunctionType bType = bReg.getType(); + Object a = aReg.getTarget(); + Object b = bReg.getTarget(); + Object composedFunction = null; if (a instanceof Supplier && b instanceof Function) { Supplier> supplier = (Supplier>) a; if (b instanceof FluxConsumer) { if (supplier instanceof FluxSupplier) { FluxConsumer fConsumer = ((FluxConsumer) b); - return (Supplier>) () -> Mono.from( + composedFunction = (Supplier>) () -> Mono.from( supplier.get().compose(v -> fConsumer.apply(supplier.get()))); } else { @@ -376,7 +379,8 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi } else { Function function = (Function) b; - return (Supplier) () -> function.apply(supplier.get()); + composedFunction = (Supplier) () -> function + .apply(supplier.get()); } } else if (a instanceof Function && b instanceof Function) { @@ -384,7 +388,7 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi Function function2 = (Function) b; if (function1 instanceof FluxToMonoFunction) { if (function2 instanceof MonoToFluxFunction) { - return function1.andThen(function2); + composedFunction = function1.andThen(function2); } else { throw new IllegalStateException( @@ -393,23 +397,27 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi } } else if (function2 instanceof FluxToMonoFunction) { - return new FluxToMonoFunction( + composedFunction = new FluxToMonoFunction( ((Function, Flux>) a).andThen( ((FluxToMonoFunction) b).getTarget())); } else { - return function1.andThen(function2); + composedFunction = function1.andThen(function2); } } else if (a instanceof Function && b instanceof Consumer) { Function function = (Function) a; Consumer consumer = (Consumer) b; - return (Consumer) v -> consumer.accept(function.apply(v)); + composedFunction = (Consumer) v -> consumer.accept(function.apply(v)); } else { throw new IllegalArgumentException(String .format("Could not compose %s and %s", a.getClass(), b.getClass())); } + String name = aReg.getNames().iterator().next() + "|" + + bReg.getNames().iterator().next(); + return new FunctionRegistration<>(composedFunction, name) + .type(FunctionType.compose(aType, bType)); } @SuppressWarnings("unchecked")