Support for implicit functions in composition

If there is only one function then empty String can be used as a
name to look it up.
This commit is contained in:
Dave Syer
2019-02-28 09:55:11 -05:00
parent b48c7b5dc0
commit 8d834a7483
3 changed files with 112 additions and 19 deletions

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.function.context.catalog;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -25,8 +27,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -167,7 +167,8 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
* @return the name of the function or null.
*/
public String lookupFunctionName(Object function) {
return this.names.containsKey(function) ? this.names.get(function) : null;
return function != null && this.names.containsKey(function)
? this.names.get(function) : null;
}
@Override
@@ -311,10 +312,9 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
else {
String[] stages = StringUtils.delimitedListToStringArray(name, "|");
if (Stream.of(stages).allMatch(funcName -> contains(funcName))) {
List<FunctionRegistration<?>> composableFunctions = find(stages);
if (!composableFunctions.isEmpty()) {
List<FunctionRegistration<?>> composableFunctions = Stream.of(stages)
.map(funcName -> find(funcName)).collect(Collectors.toList());
FunctionRegistration<?> composedRegistration = composableFunctions
.stream().reduce((a, z) -> composeFunctions(a, z))
.orElseGet(() -> null);
@@ -348,9 +348,57 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
return name.replaceAll(",", "|").trim();
}
private boolean contains(String name) {
return getSupplierNames().contains(name) || getFunctionNames().contains(name)
|| getConsumerNames().contains(name);
private List<FunctionRegistration<?>> find(String[] stages) {
if (stages.length == 0) {
return Collections.emptyList();
}
if (stages.length == 1) {
FunctionRegistration<?> found = find(stages[0]);
return found == null ? Collections.emptyList() : Arrays.asList(found);
}
List<FunctionRegistration<?>> list = new ArrayList<>();
for (int i = 0; i < stages.length; i++) {
String name = stages[i];
FunctionRegistration<?> registration = find(name);
if (registration != null) {
list.add(registration);
}
else {
if (i == 0) {
// Supplier or Function
if (this.suppliers.size() == 1) {
name = this.suppliers.keySet().iterator().next();
}
else if (this.functions.size() == 1) {
name = this.functions.keySet().iterator().next();
}
}
else if (i == stages.length - 1) {
// Consumer or Function
if (this.consumers.size() == 1) {
name = this.consumers.keySet().iterator().next();
}
else if (this.functions.size() == 1) {
name = this.functions.keySet().iterator().next();
}
}
else {
// Function
if (this.functions.size() == 1) {
name = this.functions.keySet().iterator().next();
}
}
registration = find(name);
if (registration != null) {
list.add(registration);
}
else {
// Can't find default stage so we can't compose anything
return Collections.emptyList();
}
}
}
return list;
}
private FunctionRegistration<?> find(String name) {

View File

@@ -95,7 +95,8 @@ public class ContextFunctionCatalogAutoConfiguration {
@Override
public FunctionRegistration<?> getRegistration(Object function) {
String functionName = this.lookupFunctionName(function);
String functionName = function == null ? null
: this.lookupFunctionName(function);
if (StringUtils.hasText(functionName)) {
FunctionRegistration<?> registration = new FunctionRegistration<Object>(
function, functionName);

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.context.catalog;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Test;
import reactor.core.publisher.Flux;
@@ -39,7 +40,7 @@ public class InMemoryFunctionCatalogTests {
public void testFunctionRegistration() {
TestFunction function = new TestFunction();
FunctionRegistration<TestFunction> registration = new FunctionRegistration<>(
function, "foo").type(FunctionType.of(TestFunction.class).getType());
function, "foo").type(FunctionType.of(TestFunction.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(registration);
FunctionRegistration<?> registration2 = catalog.getRegistration(function);
@@ -50,7 +51,7 @@ public class InMemoryFunctionCatalogTests {
public void testFunctionLookup() {
TestFunction function = new TestFunction();
FunctionRegistration<TestFunction> registration = new FunctionRegistration<>(
function, "foo").type(FunctionType.of(TestFunction.class).getType());
function, "foo").type(FunctionType.of(TestFunction.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(registration);
@@ -68,10 +69,9 @@ public class InMemoryFunctionCatalogTests {
@Test
public void testFunctionComposition() {
FunctionRegistration<UpperCase> upperCaseRegistration = new FunctionRegistration<>(
new UpperCase(), "uppercase")
.type(FunctionType.of(UpperCase.class).getType());
new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class));
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType());
new Reverse(), "reverse").type(FunctionType.of(Reverse.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(upperCaseRegistration);
catalog.register(reverseRegistration);
@@ -85,14 +85,49 @@ public class InMemoryFunctionCatalogTests {
.isEqualTo("RATS");
}
@Test
public void testFunctionCompositionImplicit() {
FunctionRegistration<Words> wordsRegistration = new FunctionRegistration<>(
new Words(), "words").type(FunctionType.of(Words.class));
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(wordsRegistration);
catalog.register(reverseRegistration);
// There's only one function, we should be able to leave that blank
Supplier<Flux<String>> lookedUpFunction = catalog.lookup("words|");
assertThat(catalog.getFunctionType("words|").isMessage()).isFalse();
assertThat(lookedUpFunction).isNotNull();
assertThat(lookedUpFunction.get().blockFirst()).isEqualTo("olleh");
}
@Test
public void testFunctionCompositionExplicit() {
FunctionRegistration<Words> wordsRegistration = new FunctionRegistration<>(
new Words(), "words").type(FunctionType.of(Words.class));
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(wordsRegistration);
catalog.register(reverseRegistration);
Supplier<Flux<String>> lookedUpFunction = catalog.lookup("words|reverse");
assertThat(catalog.getFunctionType("words|reverse").isMessage()).isFalse();
assertThat(lookedUpFunction).isNotNull();
assertThat(lookedUpFunction.get().blockFirst()).isEqualTo("olleh");
}
@Test
public void testFunctionCompositionWithMessages() {
FunctionRegistration<UpperCaseMessage> upperCaseRegistration = new FunctionRegistration<>(
new UpperCaseMessage(), "uppercase")
.type(FunctionType.of(UpperCaseMessage.class).getType());
.type(FunctionType.of(UpperCaseMessage.class));
FunctionRegistration<ReverseMessage> reverseRegistration = new FunctionRegistration<>(
new ReverseMessage(), "reverse")
.type(FunctionType.of(ReverseMessage.class).getType());
.type(FunctionType.of(ReverseMessage.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(upperCaseRegistration);
catalog.register(reverseRegistration);
@@ -113,9 +148,9 @@ public class InMemoryFunctionCatalogTests {
public void testFunctionCompositionMixedMessages() {
FunctionRegistration<UpperCaseMessage> upperCaseRegistration = new FunctionRegistration<>(
new UpperCaseMessage(), "uppercase")
.type(FunctionType.of(UpperCaseMessage.class).getType());
.type(FunctionType.of(UpperCaseMessage.class));
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType());
new Reverse(), "reverse").type(FunctionType.of(Reverse.class));
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(upperCaseRegistration);
catalog.register(reverseRegistration);
@@ -132,6 +167,15 @@ public class InMemoryFunctionCatalogTests {
assertThat(message.getHeaders().get("foo")).isEqualTo("bar");
}
private static class Words implements Supplier<String> {
@Override
public String get() {
return "hello";
}
}
private static class UpperCase implements Function<String, String> {
@Override