diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index df4d8fa31..1229ff3c3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -50,10 +50,9 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; /** - * Base implementation of {@link FunctionRegistry} which supports - * function composition during lookups. For example if this registry contains - * function 'a' and 'b' you can compose them into a single function by - * simply piping two names together during the + * Base implementation of {@link FunctionRegistry} which supports function composition + * during lookups. For example if this registry contains function 'a' and 'b' you can + * compose them into a single function by simply piping two names together during the * lookup {@code this.lookup(Function.class, "a|b")}. * * Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}). @@ -62,9 +61,8 @@ import org.springframework.util.StringUtils; * @since 2.1 * */ -public abstract class AbstractComposableFunctionRegistry - implements FunctionRegistry, FunctionInspector, - ApplicationEventPublisherAware, EnvironmentAware { +public abstract class AbstractComposableFunctionRegistry implements FunctionRegistry, + FunctionInspector, ApplicationEventPublisherAware, EnvironmentAware { private final Map suppliers = new ConcurrentHashMap<>(); @@ -93,10 +91,12 @@ public abstract class AbstractComposableFunctionRegistry @Override public Set getNames(Class type) { if (type == null) { - return new HashSet(getSupplierNames()) {{ - addAll(getConsumerNames()); - addAll(getFunctionNames()); - } }; + return new HashSet(getSupplierNames()) { + { + addAll(getConsumerNames()); + addAll(getFunctionNames()); + } + }; } if (Supplier.class.isAssignableFrom(type)) { return this.getSupplierNames(); @@ -150,8 +150,10 @@ public abstract class AbstractComposableFunctionRegistry * The count of all Suppliers, Function and Consumers currently registered. * @return the count of all Suppliers, Function and Consumers currently registered. */ + @Override public int size() { - return getSupplierNames().size() + getFunctionNames().size() + getConsumerNames().size(); + return getSupplierNames().size() + getFunctionNames().size() + + getConsumerNames().size(); } public FunctionType getFunctionType(String name) { @@ -159,10 +161,9 @@ public abstract class AbstractComposableFunctionRegistry } /** - * A reverse lookup where one can determine the actual name of - * the function reference. - * @param function should be an instance of {@link Supplier}, - * {@link Function} or {@link Consumer}; + * A reverse lookup where one can determine the actual name of the function reference. + * @param function should be an instance of {@link Supplier}, {@link Function} or + * {@link Consumer}; * @return the name of the function or null. */ public String lookupFunctionName(Object function) { @@ -227,9 +228,10 @@ public abstract class AbstractComposableFunctionRegistry if (functionType != null) { return functionType; } - throw new IllegalStateException("Unless FunctionType is already available in FunctionRegistration, " - + "this operation must be overriden " - + "by the implementation of the FunctionRegistry."); + throw new IllegalStateException( + "Unless FunctionType is already available in FunctionRegistration, " + + "this operation must be overriden " + + "by the implementation of the FunctionRegistry."); } protected void addSupplier(String name, Supplier supplier) { @@ -275,7 +277,7 @@ public abstract class AbstractComposableFunctionRegistry return registration; } - private void addType(String name, FunctionType functionType) { + protected void addType(String name, FunctionType functionType) { this.types.computeIfAbsent(name, str -> functionType); } @@ -308,11 +310,13 @@ public abstract class AbstractComposableFunctionRegistry else { String[] stages = StringUtils.delimitedListToStringArray(name, "|"); if (Stream.of(stages).allMatch(funcName -> contains(funcName))) { - List composableFunctions = Stream.of(stages).map(funcName -> find(funcName)) - .collect(Collectors.toList()); - composedFunction = composableFunctions.stream().reduce((a, z) -> composeFunctions(a, z)) + List composableFunctions = Stream.of(stages) + .map(funcName -> find(funcName)).collect(Collectors.toList()); + composedFunction = composableFunctions.stream() + .reduce((a, z) -> composeFunctions(a, z)) .orElseGet(() -> null); - if (composedFunction != null && !this.types.containsKey(name) && this.types.containsKey(stages[0]) + if (composedFunction != null && !this.types.containsKey(name) + && this.types.containsKey(stages[0]) && this.types.containsKey(stages[stages.length - 1])) { FunctionType input = this.types.get(stages[0]); FunctionType output = this.types.get(stages[stages.length - 1]); @@ -339,17 +343,17 @@ public abstract class AbstractComposableFunctionRegistry } private boolean contains(String name) { - return getSupplierNames().contains(name) - || getFunctionNames().contains(name) || getConsumerNames().contains(name); + return getSupplierNames().contains(name) || getFunctionNames().contains(name) + || getConsumerNames().contains(name); } private Object find(String name) { - Object result = suppliers.get(name); + Object result = this.suppliers.get(name); if (result == null) { - result = functions.get(name); + result = this.functions.get(name); } if (result == null) { - result = consumers.get(name); + result = this.consumers.get(name); } return result; } @@ -361,8 +365,8 @@ public abstract class AbstractComposableFunctionRegistry if (b instanceof FluxConsumer) { if (supplier instanceof FluxSupplier) { FluxConsumer fConsumer = ((FluxConsumer) b); - return (Supplier>) () -> Mono - .from(supplier.get().compose(v -> fConsumer.apply(supplier.get()))); + return (Supplier>) () -> Mono.from( + supplier.get().compose(v -> fConsumer.apply(supplier.get()))); } else { throw new IllegalStateException( @@ -383,13 +387,15 @@ public abstract class AbstractComposableFunctionRegistry return function1.andThen(function2); } else { - throw new IllegalStateException("The provided function is finite (i.e., returns Mono) " - + "therefore it can *only* be composed with compatible function (i.e., Function"); + throw new IllegalStateException( + "The provided function is finite (i.e., returns Mono) " + + "therefore it can *only* be composed with compatible function (i.e., Function"); } } else if (function2 instanceof FluxToMonoFunction) { - return new FluxToMonoFunction(((Function, Flux>) a) - .andThen(((FluxToMonoFunction) b).getTarget())); + return new FluxToMonoFunction( + ((Function, Flux>) a).andThen( + ((FluxToMonoFunction) b).getTarget())); } else { return function1.andThen(function2); @@ -401,8 +407,8 @@ public abstract class AbstractComposableFunctionRegistry return (Consumer) v -> consumer.accept(function.apply(v)); } else { - throw new IllegalArgumentException( - String.format("Could not compose %s and %s", a.getClass(), b.getClass())); + throw new IllegalArgumentException(String + .format("Could not compose %s and %s", a.getClass(), b.getClass())); } } @@ -429,4 +435,5 @@ public abstract class AbstractComposableFunctionRegistry } return function; } + } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java index 75e5c8478..445f59229 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java @@ -56,7 +56,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry Assert.notEmpty(functionRegistration.getNames(), "'registration' must contain at least one name before it is registered in catalog."); // TODO should we just delegate to wrap(..)???? - //wrap(functionRegistration, functionRegistration.getNames().iterator().next()); + // wrap(functionRegistration, functionRegistration.getNames().iterator().next()); Class type = Object.class; if (functionRegistration.getTarget() instanceof Function) { type = Function.class; @@ -81,6 +81,7 @@ public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry } for (String name : functionRegistration.getNames()) { + addType(name, functionRegistration.getType()); if (functionRegistration.getTarget() instanceof Function) { this.addFunction(name, (Function) functionRegistration.getTarget()); } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index f88eb0d9d..a54e888c8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -73,13 +73,9 @@ import org.springframework.util.StringUtils; */ @Configuration @ConditionalOnMissingBean(FunctionCatalog.class) -@ComponentScan( - basePackages = "${spring.cloud.function.scan.packages:functions}", - includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, - classes = { - Supplier.class, - Function.class, - Consumer.class })) +@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", // + includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { + Supplier.class, Function.class, Consumer.class })) public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @@ -89,8 +85,9 @@ public class ContextFunctionCatalogAutoConfiguration { return new BeanFactoryFunctionCatalog(); } - protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry - implements InitializingBean, BeanFactoryAware { + protected static class BeanFactoryFunctionCatalog + extends AbstractComposableFunctionRegistry + implements InitializingBean, BeanFactoryAware { private ApplicationEventPublisher applicationEventPublisher; @@ -100,13 +97,15 @@ public class ContextFunctionCatalogAutoConfiguration { public FunctionRegistration getRegistration(Object function) { String functionName = this.lookupFunctionName(function); if (StringUtils.hasText(functionName)) { - FunctionRegistration registration = new FunctionRegistration(function, functionName); + FunctionRegistration registration = new FunctionRegistration( + function, functionName); FunctionType functionType = this.findType(registration); return registration.type(functionType.getType()); } return null; } + @Override public void register(FunctionRegistration functionRegistration) { Assert.notEmpty(functionRegistration.getNames(), "'registration' must contain at least one name before it is registered in catalog."); @@ -120,13 +119,13 @@ public class ContextFunctionCatalogAutoConfiguration { @Override @SuppressWarnings("rawtypes") public void afterPropertiesSet() throws Exception { - Map supplierBeans = beanFactory + Map supplierBeans = this.beanFactory .getBeansOfType(Supplier.class); - Map functionBeans = beanFactory + Map functionBeans = this.beanFactory .getBeansOfType(Function.class); - Map consumerBeans = beanFactory + Map consumerBeans = this.beanFactory .getBeansOfType(Consumer.class); - Map functionRegistrationBeans = beanFactory + Map functionRegistrationBeans = this.beanFactory .getBeansOfType(FunctionRegistration.class); this.doMerge(functionRegistrationBeans, consumerBeans, supplierBeans, functionBeans); @@ -168,7 +167,8 @@ public class ContextFunctionCatalogAutoConfiguration { if (functionType == null) { functionType = functionByNameExist(name) - ? new FunctionType(functionRegistration.getTarget().getClass()) : new FunctionType( + ? new FunctionType(functionRegistration.getTarget().getClass()) + : new FunctionType( FunctionContextUtils.findType(name, this.beanFactory)); } @@ -177,6 +177,11 @@ public class ContextFunctionCatalogAutoConfiguration { // @checkstyle:off /** + * @param initial a registration + * @param consumers consumers to register + * @param suppliers suppliers to register + * @param functions functions to register + * @return a new registration * @deprecated Was never intended for public use. */ @Deprecated @@ -253,6 +258,7 @@ public class ContextFunctionCatalogAutoConfiguration { registrations.forEach(registration -> wrap(registration, targets.get(registration.getTarget()))); } + } private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition { @@ -289,10 +295,8 @@ public class ContextFunctionCatalogAutoConfiguration { @Configuration @ConditionalOnClass(ObjectMapper.class) @ConditionalOnBean(ObjectMapper.class) - @ConditionalOnProperty( - name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, - havingValue = "jackson", - matchIfMissing = true) + @ConditionalOnProperty(name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, // + havingValue = "jackson", matchIfMissing = true) protected static class JacksonConfiguration { @Bean diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java index 79f2f8887..edc5f0268 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java @@ -24,6 +24,8 @@ import reactor.core.publisher.Flux; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.core.FluxFunction; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; @@ -63,17 +65,44 @@ public class InMemoryFunctionCatalogTests { @Test public void testFunctionComposition() { FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( - new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class).getType()); + new UpperCase(), "uppercase") + .type(FunctionType.of(UpperCase.class).getType()); FunctionRegistration reverseRegistration = new FunctionRegistration<>( new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType()); InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); catalog.register(upperCaseRegistration); catalog.register(reverseRegistration); - Function, Flux> lookedUpFunction = catalog.lookup("uppercase|reverse"); + Function, Flux> lookedUpFunction = catalog + .lookup("uppercase|reverse"); + assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isFalse(); assertThat(lookedUpFunction).isNotNull(); - assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()).isEqualTo("RATS"); + assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()) + .isEqualTo("RATS"); + } + + @Test + public void testFunctionCompositionWithMessages() { + FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( + new UpperCaseMessage(), "uppercase") + .type(FunctionType.of(UpperCaseMessage.class).getType()); + // TODO: make this work with plain Reverse (not message) + FunctionRegistration reverseRegistration = new FunctionRegistration<>( + new ReverseMessage(), "reverse") + .type(FunctionType.of(ReverseMessage.class).getType()); + InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); + catalog.register(upperCaseRegistration); + catalog.register(reverseRegistration); + + Function>, Flux>> lookedUpFunction = catalog + .lookup("uppercase|reverse"); + assertThat(catalog.getFunctionType("uppercase|reverse").isMessage()).isTrue(); + + assertThat(lookedUpFunction).isNotNull(); + assertThat(lookedUpFunction + .apply(Flux.just(MessageBuilder.withPayload("star").build())).blockFirst() + .getPayload()).isEqualTo("RATS"); } private static class UpperCase implements Function { @@ -85,6 +114,17 @@ public class InMemoryFunctionCatalogTests { } + private static class UpperCaseMessage + implements Function, Message> { + + @Override + public Message apply(Message t) { + return MessageBuilder.withPayload(t.getPayload().toUpperCase()) + .copyHeaders(t.getHeaders()).build(); + } + + } + private static class Reverse implements Function { @Override @@ -94,6 +134,18 @@ public class InMemoryFunctionCatalogTests { } + private static class ReverseMessage + implements Function, Message> { + + @Override + public Message apply(Message t) { + return MessageBuilder + .withPayload(new StringBuilder(t.getPayload()).reverse().toString()) + .copyHeaders(t.getHeaders()).build(); + } + + } + private static class TestFunction implements Function { @Override diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index eab9473fe..1302abc75 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -34,6 +34,7 @@ import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog; import static org.assertj.core.api.Assertions.assertThat; + /** * @author Dave Syer * @@ -198,7 +199,7 @@ public class BeanFactoryFunctionCatalogTests { Consumer c = x -> ref.set(x.toUpperCase()); this.processor.register(new FunctionRegistration<>(c, "consumer")); Supplier> f = this.processor.lookup("supplier|consumer"); - ((Mono) f.get()).block(); + f.get().block(); assertThat(ref.get()).isEqualTo("HELLO"); } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java index 2459159b7..6a434b32e 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxFunction.java @@ -24,8 +24,8 @@ import reactor.core.publisher.Flux; * {@link Function} implementation that wraps a target Function so that the target's * simple input and output types will be wrapped as {@link Flux} instances. * - * @param input type of target function - * @param output type of target function + * @param input type of target function + * @param output type of target function * @author Mark Fisher * @author Oleg Zhurakousky */ diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java index a1aeae979..4da1d36d1 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/core/FluxedFunction.java @@ -24,8 +24,8 @@ import reactor.core.publisher.Flux; * {@link Function} implementation that wraps a target Function so that the target's * simple input and output types will be wrapped as {@link Flux} instances. * - * @param input type of target function - * @param output type of target function + * @param input type of target function + * @param output type of target function * @author Oleg Zhurakousky * @since 2.0.1 */ diff --git a/spring-cloud-function-samples/function-sample-aws/pom.xml b/spring-cloud-function-samples/function-sample-aws/pom.xml index ae18a8167..e79634e26 100644 --- a/spring-cloud-function-samples/function-sample-aws/pom.xml +++ b/spring-cloud-function-samples/function-sample-aws/pom.xml @@ -25,8 +25,7 @@ 1.8 1.0.17.RELEASE 2.0.2 - 2.1.0.BUILD-SNAPSHOT - + 2.1.0.BUILD-SNAPSHOT example.Config