From aaf268ea40018b54bf35bb2f1d52678483f32514 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Thu, 21 Feb 2019 09:53:15 +0000 Subject: [PATCH] Ensure FunctionTypes get registered for composite functions Without this change the type of a composed function in the InMemoryFunctionCatalog is always null. The key is to register the type at the same time as the function is registered. Also some format and javadoc fixes (cosmetic) --- .../AbstractComposableFunctionRegistry.java | 81 ++++++++++--------- .../catalog/InMemoryFunctionCatalog.java | 3 +- ...ntextFunctionCatalogAutoConfiguration.java | 42 +++++----- .../catalog/InMemoryFunctionCatalogTests.java | 58 ++++++++++++- .../BeanFactoryFunctionCatalogTests.java | 3 +- .../cloud/function/core/FluxFunction.java | 4 +- .../cloud/function/core/FluxedFunction.java | 4 +- .../function-sample-aws/pom.xml | 3 +- 8 files changed, 131 insertions(+), 67 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index df4d8fa31..1229ff3c3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -50,10 +50,9 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; /** - * Base implementation of {@link FunctionRegistry} which supports - * function composition during lookups. For example if this registry contains - * function 'a' and 'b' you can compose them into a single function by - * simply piping two names together during the + * Base implementation of {@link FunctionRegistry} which supports function composition + * during lookups. For example if this registry contains function 'a' and 'b' you can + * compose them into a single function by simply piping two names together during the * lookup {@code this.lookup(Function.class, "a|b")}. * * Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}). @@ -62,9 +61,8 @@ import org.springframework.util.StringUtils; * @since 2.1 * */ -public abstract class AbstractComposableFunctionRegistry - implements FunctionRegistry, FunctionInspector, - ApplicationEventPublisherAware, EnvironmentAware { +public abstract class AbstractComposableFunctionRegistry implements FunctionRegistry, + FunctionInspector, ApplicationEventPublisherAware, EnvironmentAware { private final Map suppliers = new ConcurrentHashMap<>(); @@ -93,10 +91,12 @@ public abstract class AbstractComposableFunctionRegistry @Override public Set getNames(Class type) { if (type == null) { - return new HashSet(getSupplierNames()) {{ - addAll(getConsumerNames()); - addAll(getFunctionNames()); - } }; + return new HashSet(getSupplierNames()) { + { + addAll(getConsumerNames()); + addAll(getFunctionNames()); + } + }; } if (Supplier.class.isAssignableFrom(type)) { return this.getSupplierNames(); @@ -150,8 +150,10 @@ public abstract class AbstractComposableFunctionRegistry * The count of all Suppliers, Function and Consumers currently registered. * @return the count of all Suppliers, Function and Consumers currently registered. */ + @Override public int size() { - return getSupplierNames().size() + getFunctionNames().size() + getConsumerNames().size(); + return getSupplierNames().size() + getFunctionNames().size() + + getConsumerNames().size(); } public FunctionType getFunctionType(String name) { @@ -159,10 +161,9 @@ public abstract class AbstractComposableFunctionRegistry } /** - * A reverse lookup where one can determine the actual name of - * the function reference. - * @param function should be an instance of {@link Supplier}, - * {@link Function} or {@link Consumer}; + * A reverse lookup where one can determine the actual name of the function reference. + * @param function should be an instance of {@link Supplier}, {@link Function} or + * {@link Consumer}; * @return the name of the function or null. */ public String lookupFunctionName(Object function) { @@ -227,9 +228,10 @@ public abstract class AbstractComposableFunctionRegistry if (functionType != null) { return functionType; } - throw new IllegalStateException("Unless FunctionType is already available in FunctionRegistration, " - + "this operation must be overriden " - + "by the implementation of the FunctionRegistry."); + throw new IllegalStateException( + "Unless FunctionType is already available in FunctionRegistration, " + + "this operation must be overriden " + + "by the implementation of the FunctionRegistry."); } protected void addSupplier(String name, Supplier supplier) { @@ -275,7 +277,7 @@ public abstract class AbstractComposableFunctionRegistry return registration; } - private void addType(String name, FunctionType functionType) { + protected void addType(String name, FunctionType functionType) { this.types.computeIfAbsent(name, str -> functionType); } @@ -308,11 +310,13 @@ public abstract class AbstractComposableFunctionRegistry else { String[] stages = StringUtils.delimitedListToStringArray(name, "|"); if (Stream.of(stages).allMatch(funcName -> contains(funcName))) { - List composableFunctions = Stream.of(stages).map(funcName -> find(funcName)) - .collect(Collectors.toList()); - composedFunction = composableFunctions.stream().reduce((a, z) -> composeFunctions(a, z)) + List composableFunctions = Stream.of(stages) + .map(funcName -> find(funcName)).collect(Collectors.toList()); + composedFunction = composableFunctions.stream() + .reduce((a, z) -> composeFunctions(a, z)) .orElseGet(() -> null); - if (composedFunction != null && !this.types.containsKey(name) && this.types.containsKey(stages[0]) + if (composedFunction != null && !this.types.containsKey(name) + && this.types.containsKey(stages[0]) && this.types.containsKey(stages[stages.length - 1])) { FunctionType input = this.types.get(stages[0]); FunctionType output = this.types.get(stages[stages.length - 1]); @@ -339,17 +343,17 @@ public abstract class AbstractComposableFunctionRegistry } private boolean contains(String name) { - return getSupplierNames().contains(name) - || getFunctionNames().contains(name) || getConsumerNames().contains(name); + return getSupplierNames().contains(name) || getFunctionNames().contains(name) + || getConsumerNames().contains(name); } private Object find(String name) { - Object result = suppliers.get(name); + Object result = this.suppliers.get(name); if (result == null) { - result = functions.get(name); + result = this.functions.get(name); } if (result == null) { - result = consumers.get(name); + result = this.consumers.get(name); } return result; } @@ -361,8 +365,8 @@ public abstract class AbstractComposableFunctionRegistry if (b instanceof FluxConsumer) { if (supplier instanceof FluxSupplier) { FluxConsumer fConsumer = ((FluxConsumer) b); - return (Supplier>) () -> Mono - .from(supplier.get().compose(v -> fConsumer.apply(supplier.get()))); + return (Supplier>) () -> Mono.from( + supplier.get().compose(v -> fConsumer.apply(supplier.get()))); } else { throw new IllegalStateException( @@ -383,13 +387,15 @@ public abstract class AbstractComposableFunctionRegistry return function1.andThen(function2); } else { - throw new IllegalStateException("The provided function is finite (i.e., returns Mono) " - + "therefore it can *only* be composed with compatible function (i.e., Function"); + throw new IllegalStateException( + "The provided function is finite (i.e., returns Mono) " + + "therefore it can *only* be composed with compatible function (i.e., Function"); } } else if (function2 instanceof FluxToMonoFunction) { - return new FluxToMonoFunction(((Function, Flux>) a) - .andThen(((FluxToMonoFunction) b).getTarget())); + return new FluxToMonoFunction( + ((Function, Flux>) a).andThen( + ((FluxToMonoFunction) b).getTarget())); } else { return function1.andThen(function2); @@ -401,8 +407,8 @@ public abstract class AbstractComposableFunctionRegistry return (Consumer) v -> consumer.accept(function.apply(v)); } else { - throw new IllegalArgumentException( - String.format("Could not compose %s and %s", a.getClass(), b.getClass())); + throw new IllegalArgumentException(String + .format("Could not compose %s and %s", a.getClass(), b.getClass())); } } @@ -429,4 +435,5 @@ public abstract class AbstractComposableFunctionRegistry } return function; } + } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java index 75e5c8478..445f59229 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java @@ -56,7 +56,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry Assert.notEmpty(functionRegistration.getNames(), "'registration' must contain at least one name before it is registered in catalog."); // TODO should we just delegate to wrap(..)???? - //wrap(functionRegistration, functionRegistration.getNames().iterator().next()); + // wrap(functionRegistration, functionRegistration.getNames().iterator().next()); Class type = Object.class; if (functionRegistration.getTarget() instanceof Function) { type = Function.class; @@ -81,6 +81,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry } for (String name : functionRegistration.getNames()) { + addType(name, functionRegistration.getType()); if (functionRegistration.getTarget() instanceof Function) { this.addFunction(name, (Function) functionRegistration.getTarget()); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index f88eb0d9d..a54e888c8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -73,13 +73,9 @@ import org.springframework.util.StringUtils; */ @Configuration @ConditionalOnMissingBean(FunctionCatalog.class) -@ComponentScan( - basePackages = "${spring.cloud.function.scan.packages:functions}", - includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, - classes = { - Supplier.class, - Function.class, - Consumer.class })) +@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", // + includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { + Supplier.class, Function.class, Consumer.class })) public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @@ -89,8 +85,9 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryFunctionCatalog(); } - protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry - implements InitializingBean, BeanFactoryAware { + protected static class BeanFactoryFunctionCatalog + extends AbstractComposableFunctionRegistry + implements InitializingBean, BeanFactoryAware { private ApplicationEventPublisher applicationEventPublisher; @@ -100,13 +97,15 @@ public class ContextFunctionCatalogAutoConfiguration { public FunctionRegistration getRegistration(Object function) { String functionName = this.lookupFunctionName(function); if (StringUtils.hasText(functionName)) { - FunctionRegistration registration = new FunctionRegistration(function, functionName); + FunctionRegistration registration = new FunctionRegistration( + function, functionName); FunctionType functionType = this.findType(registration); return registration.type(functionType.getType()); } return null; } + @Override public void register(FunctionRegistration functionRegistration) { Assert.notEmpty(functionRegistration.getNames(), "'registration' must contain at least one name before it is registered in catalog."); @@ -120,13 +119,13 @@ public class ContextFunctionCatalogAutoConfiguration { @Override @SuppressWarnings("rawtypes") public void afterPropertiesSet() throws Exception { - Map supplierBeans = beanFactory + Map supplierBeans = this.beanFactory .getBeansOfType(Supplier.class); - Map functionBeans = beanFactory + Map functionBeans = this.beanFactory .getBeansOfType(Function.class); - Map consumerBeans = beanFactory + Map consumerBeans = this.beanFactory .getBeansOfType(Consumer.class); - Map functionRegistrationBeans = beanFactory + Map functionRegistrationBeans = this.beanFactory .getBeansOfType(FunctionRegistration.class); this.doMerge(functionRegistrationBeans, consumerBeans, supplierBeans, functionBeans); @@ -168,7 +167,8 @@ public class ContextFunctionCatalogAutoConfiguration { if (functionType == null) { functionType = functionByNameExist(name) - ? new FunctionType(functionRegistration.getTarget().getClass()) : new FunctionType( + ? new FunctionType(functionRegistration.getTarget().getClass()) + : new FunctionType( FunctionContextUtils.findType(name, this.beanFactory)); } @@ -177,6 +177,11 @@ public class ContextFunctionCatalogAutoConfiguration { // @checkstyle:off /** + * @param initial a registration + * @param consumers consumers to register + * @param suppliers suppliers to register + * @param functions functions to register + * @return a new registration * @deprecated Was never intended for public use. */ @Deprecated @@ -253,6 +258,7 @@ public class ContextFunctionCatalogAutoConfiguration { registrations.forEach(registration -> wrap(registration, targets.get(registration.getTarget()))); } + } private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition { @@ -289,10 +295,8 @@ public class ContextFunctionCatalogAutoConfiguration { @Configuration @ConditionalOnClass(ObjectMapper.class) @ConditionalOnBean(ObjectMapper.class) - @ConditionalOnProperty( - name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, - havingValue = "jackson", - matchIfMissing = true) + @ConditionalOnProperty(name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, // + havingValue = "jackson", matchIfMissing = true) protected static class JacksonConfiguration { @Bean diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java index 79f2f8887..edc5f0268 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java @@ -24,6 +24,8 @@ import reactor.core.publisher.Flux; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.core.FluxFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; @@ -63,17 +65,44 @@ public class InMemoryFunctionCatalogTests { @Test public void testFunctionComposition() { FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( - new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class).getType()); + new UpperCase(), "uppercase") + .type(FunctionType.of(UpperCase.class).getType()); FunctionRegistration reverseRegistration = new FunctionRegistration<>( new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType()); InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); catalog.register(upperCaseRegistration); catalog.register(reverseRegistration); - Function, Flux> lookedUpFunction = catalog.lookup("uppercase|reverse"); + Function, Flux> lookedUpFunction = catalog + .lookup("uppercase|reverse"); + assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isFalse(); assertThat(lookedUpFunction).isNotNull(); - assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()).isEqualTo("RATS"); + assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()) + .isEqualTo("RATS"); + } + + @Test + public void testFunctionCompositionWithMessages() { + FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( + new UpperCaseMessage(), "uppercase") + .type(FunctionType.of(UpperCaseMessage.class).getType()); + // TODO: make this work with plain Reverse (not message) + FunctionRegistration reverseRegistration = new FunctionRegistration<>( + new ReverseMessage(), "reverse") + .type(FunctionType.of(ReverseMessage.class).getType()); + InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); + catalog.register(upperCaseRegistration); + catalog.register(reverseRegistration); + + Function>, Flux>> lookedUpFunction = catalog + .lookup("uppercase|reverse"); + assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isTrue(); + + assertThat(lookedUpFunction).isNotNull(); + assertThat(lookedUpFunction + .apply(Flux.just(MessageBuilder.withPayload("star").build())).blockFirst() + .getPayload()).isEqualTo("RATS"); } private static class UpperCase implements Function { @@ -85,6 +114,17 @@ public class InMemoryFunctionCatalogTests { } + private static class UpperCaseMessage + implements Function, Message> { + + @Override + public Message apply(Message t) { + return MessageBuilder.withPayload(t.getPayload().toUpperCase()) + .copyHeaders(t.getHeaders()).build(); + } + + } + private static class Reverse implements Function { @Override @@ -94,6 +134,18 @@ public class InMemoryFunctionCatalogTests { } + private static class ReverseMessage + implements Function, Message> { + + @Override + public Message apply(Message t) { + return MessageBuilder + .withPayload(new StringBuilder(t.getPayload()).reverse().toString()) + .copyHeaders(t.getHeaders()).build(); + } + + } + private static class TestFunction implements Function { @Override diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index eab9473fe..1302abc75 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog; import static org.assertj.core.api.Assertions.assertThat; + /** * @author Dave Syer * @@ -198,7 +199,7 @@ public class BeanFactoryFunctionCatalogTests { Consumer c = x -> ref.set(x.toUpperCase()); this.processor.register(new FunctionRegistration<>(c, "consumer")); Supplier> f = this.processor.lookup("supplier|consumer"); - ((Mono) f.get()).block(); + f.get().block(); assertThat(ref.get()).isEqualTo("HELLO"); } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java index 2459159b7..6a434b32e 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java @@ -24,8 +24,8 @@ import reactor.core.publisher.Flux; * {@link Function} implementation that wraps a target Function so that the target's * simple input and output types will be wrapped as {@link Flux} instances. * - * @param input type of target function - * @param output type of target function + * @param input type of target function + * @param output type of target function * @author Mark Fisher * @author Oleg Zhurakousky */ diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java index a1aeae979..4da1d36d1 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java @@ -24,8 +24,8 @@ import reactor.core.publisher.Flux; * {@link Function} implementation that wraps a target Function so that the target's * simple input and output types will be wrapped as {@link Flux} instances. * - * @param input type of target function - * @param output type of target function + * @param input type of target function + * @param output type of target function * @author Oleg Zhurakousky * @since 2.0.1 */ diff --git a/spring-cloud-function-samples/function-sample-aws/pom.xml b/spring-cloud-function-samples/function-sample-aws/pom.xml index ae18a8167..e79634e26 100644 --- a/spring-cloud-function-samples/function-sample-aws/pom.xml +++ b/spring-cloud-function-samples/function-sample-aws/pom.xml @@ -25,8 +25,7 @@ 1.8 1.0.17.RELEASE 2.0.2 - 2.1.0.BUILD-SNAPSHOT - + 2.1.0.BUILD-SNAPSHOT example.Config