diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractFunctionRegistry.java deleted file mode 100644 index c3ed1ae35..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/AbstractFunctionRegistry.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.cloud.function.context.catalog.FunctionInspector; -import org.springframework.context.ApplicationEventPublisher; -import org.springframework.context.ApplicationEventPublisherAware; -import org.springframework.core.env.Environment; -import org.springframework.core.env.StandardEnvironment; -import org.springframework.util.StringUtils; - -/** - * @author Oleg Zhurakousky - * @since 2.0.1 - * - */ -public abstract class AbstractFunctionRegistry - implements FunctionRegistry, FunctionInspector, ApplicationEventPublisherAware { - - @Autowired - private Environment environment = new StandardEnvironment(); - - protected ApplicationEventPublisher applicationEventPublisher; - - public T lookup(Class type, String name) { - String functionDefinitionName = !StringUtils.hasText(name) - && this.environment.containsProperty("spring.cloud.function.definition") - ? this.environment.getProperty("spring.cloud.function.definition") - : name; - return this.doLookup(type, functionDefinitionName); - } - - protected abstract T doLookup(Class type, String name); - - @Override - public void setApplicationEventPublisher( - ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java new file mode 100644 index 000000000..6228ce54f --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -0,0 +1,417 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog; + +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.core.FluxConsumer; +import org.springframework.cloud.function.core.FluxSupplier; +import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.IsolatedConsumer; +import org.springframework.cloud.function.core.IsolatedFunction; +import org.springframework.cloud.function.core.IsolatedSupplier; +import org.springframework.cloud.function.core.MonoToFluxFunction; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.context.EnvironmentAware; +import org.springframework.core.env.Environment; +import org.springframework.core.env.StandardEnvironment; +import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; + +/** + * Base implementation of {@link FunctionRegistry} which supports + * function composition during lookups. For example if this registry contains + * function 'a' and 'b' you can compose them into a single function by + * simply piping two names together during the + * lookup {@code this.lookup(Function.class, "a|b")}. + * + * Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}). + * + * @author Oleg Zhurakousky + * @since 2.1 + * + */ +public abstract class AbstractComposableFunctionRegistry + implements FunctionRegistry, FunctionInspector, + ApplicationEventPublisherAware, EnvironmentAware { + + private Map suppliers = new ConcurrentHashMap<>(); + + private Map functions = new ConcurrentHashMap<>(); + + private Map consumers = new ConcurrentHashMap<>(); + + private Map names = new ConcurrentHashMap<>(); + + private Map types = new ConcurrentHashMap<>(); + + private Environment environment = new StandardEnvironment(); + + protected ApplicationEventPublisher applicationEventPublisher; + + @Override + public T lookup(Class type, String name) { + String functionDefinitionName = !StringUtils.hasText(name) + && this.environment.containsProperty("spring.cloud.function.definition") + ? this.environment.getProperty("spring.cloud.function.definition") + : name; + return this.doLookup(type, functionDefinitionName); + } + + @Override + public void setApplicationEventPublisher( + ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + + @Override + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + @Override + public Set getNames(Class type) { + if (type == null) { //perhaps some synchronization + Set names = new HashSet<>(suppliers.keySet()); + names.addAll(functions.keySet()); + names.addAll(consumers.keySet()); + return names; + } + if (Supplier.class.isAssignableFrom(type)) { + return this.getSupplierNames(); + } + if (Consumer.class.isAssignableFrom(type)) { + return this.getConsumerNames(); + } + if (Function.class.isAssignableFrom(type)) { + return this.getFunctionNames(); + } + return Collections.emptySet(); + } + + public Set getSupplierNames() { + return this.suppliers.keySet(); + } + + public Set getFunctionNames() { + return this.functions.keySet(); + } + + public Set getConsumerNames() { + return this.consumers.keySet(); + } + + public boolean hasSuppliers() { + return !CollectionUtils.isEmpty(this.suppliers); + } + + public boolean hasFunctions() { + return !CollectionUtils.isEmpty(this.functions); + } + + public boolean hasConsumers() { + return !CollectionUtils.isEmpty(this.consumers); + } + + /** + * The count of all Suppliers, Function and Consumers currently registered. + * @return the count of all Suppliers, Function and Consumers currently registered. + */ + public int size() { + return this.suppliers.size() + this.functions.size() + this.consumers.size(); + } + + public FunctionType getFunctionType(String name) { + return this.types.get(name); + } + + /** + * A reverse lookup where one can determine the actual name of + * the function reference. + * @param function should be an instance of {@link Supplier}, + * {@link Function} or {@link Consumer}; + * @return the name of the function or null. + */ + public String lookupFunctionName(Object function) { + return this.names.containsKey(function) ? this.names.get(function) : null; + } + + public void addSupplier(String name, Object supplier) { + this.suppliers.put(name, supplier); + this.addName(supplier, name); + } + + public void addFunction(String name, Object function) { + this.functions.put(name, function); + this.addName(function, name); + } + + public void addConsumer(String name, Object consumer) { + this.consumers.put(name, consumer); + this.addName(consumer, name); + } + + protected void wrap(FunctionRegistration registration, String key) { + Object target = registration.getTarget(); + this.addName(target, key); + if (registration.getType() != null) { + this.addType(key, registration.getType()); + } + else { + FunctionType functionType = findType(target); + this.addType(key, functionType); + registration.type(functionType.getType()); + } + Class type; + registration = isolated(registration).wrap(); + target = registration.getTarget(); + if (target instanceof Supplier) { + type = Supplier.class; + for (String name : registration.getNames()) { + this.addSupplier(name, registration.getTarget()); + } + } + else if (target instanceof Consumer) { + type = Consumer.class; + for (String name : registration.getNames()) { + this.addConsumer(name, registration.getTarget()); + } + } + else if (target instanceof Function) { + type = Function.class; + for (String name : registration.getNames()) { + this.addFunction(name, registration.getTarget()); + } + } + else { + return; + } + this.addName(registration.getTarget(), key); + if (this.applicationEventPublisher != null) { + this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent( + registration.getTarget(), type, registration.getNames())); + } + } + + protected FunctionType findType(Object function) { + throw new UnsupportedOperationException("There is no default " + + "implementation of this operation. It must be overriden " + + "by the implementation of FunctionRegistry."); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + private Object doLookup(String name, Map lookup, Class typeOfFunction) { + Object function = compose(name, lookup); + if (function != null && typeOfFunction.isAssignableFrom(function.getClass())) { + return function; + } + return null; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private FunctionRegistration isolated(FunctionRegistration input) { + FunctionRegistration registration = (FunctionRegistration) input; + Object target = registration.getTarget(); + boolean isolated = getClass().getClassLoader() != target.getClass() + .getClassLoader(); + if (isolated) { + if (target instanceof Supplier && isolated) { + target = new IsolatedSupplier((Supplier) target); + } + else if (target instanceof Function) { + target = new IsolatedFunction((Function) target); + } + else if (target instanceof Consumer) { + target = new IsolatedConsumer((Consumer) target); + } + } + + registration.target(target); + return registration; + } + + private void addType(String name, FunctionType functionType) { + this.types.computeIfAbsent(name, str -> functionType); + } + + private void addName(Object function, String name) { + this.names.put(function, name); + } + + private Supplier lookupSupplier(String name) { + return (Supplier) doLookup(name, this.suppliers, Supplier.class); + } + + private Function lookupFunction(String name) { + return (Function) doLookup(name, this.functions, Function.class); + } + + private Consumer lookupConsumer(String name) { + return (Consumer) doLookup(name, this.consumers, Consumer.class); + } + + private Object compose(String name, Map lookup) { + name = normalizeName(name); + Object composedFunction = null; + if (lookup.containsKey(name)) { + composedFunction = lookup.get(name); + } + else { + if (name.equals("") && lookup.size() == 1) { + composedFunction = lookup.values().iterator().next(); + } + else { + String[] stages = StringUtils.delimitedListToStringArray(name, "|"); + if (Stream.of(stages).allMatch(funcName -> contains(funcName))) { + List composableFunctions = Stream.of(stages).map(funcName -> find(funcName)) + .collect(Collectors.toList()); + composedFunction = composableFunctions.stream().reduce((a, z) -> composeFunctions(a, z)) + .orElseGet(() -> null); + if (composedFunction != null && !this.types.containsKey(name) && this.types.containsKey(stages[0]) + && this.types.containsKey(stages[stages.length - 1])) { + FunctionType input = this.types.get(stages[0]); + FunctionType output = this.types.get(stages[stages.length - 1]); + this.types.put(name, FunctionType.compose(input, output)); + this.names.put(composedFunction, name); + if (composedFunction instanceof Function) { + this.functions.put(name, composedFunction); + } + else if (composedFunction instanceof Consumer) { + this.consumers.put(name, composedFunction); + } + else if (composedFunction instanceof Supplier) { + this.suppliers.put(name, composedFunction); + } + } + } + } + } + return composedFunction; + } + + private String normalizeName(String name) { + return name.replaceAll(",", "|").trim(); + } + + private boolean contains(String name) { + return suppliers.containsKey(name) || functions.containsKey(name) || consumers.containsKey(name); + } + + private Object find(String name) { + Object result = suppliers.get(name); + if (result == null) { + result = functions.get(name); + } + if (result == null) { + result = consumers.get(name); + } + return result; + } + + @SuppressWarnings("unchecked") + private Object composeFunctions(Object a, Object b) { + if (a instanceof Supplier && b instanceof Function) { + Supplier> supplier = (Supplier>) a; + if (b instanceof FluxConsumer) { + if (supplier instanceof FluxSupplier) { + FluxConsumer fConsumer = ((FluxConsumer) b); + return (Supplier>) () -> Mono + .from(supplier.get().compose(v -> fConsumer.apply(supplier.get()))); + } + else { + throw new IllegalStateException( + "The provided supplier is finite (i.e., already composed with Consumer) " + + "therefore it can not be composed with another consumer"); + } + } + else { + Function function = (Function) b; + return (Supplier) () -> function.apply(supplier.get()); + } + } + else if (a instanceof Function && b instanceof Function) { + Function function1 = (Function) a; + Function function2 = (Function) b; + if (function1 instanceof FluxToMonoFunction) { + if (function2 instanceof MonoToFluxFunction) { + return function1.andThen(function2); + } + else { + throw new IllegalStateException("The provided function is finite (i.e., returns Mono) " + + "therefore it can *only* be composed with compatible function (i.e., Function"); + } + } + else if (function2 instanceof FluxToMonoFunction) { + return new FluxToMonoFunction(((Function, Flux>) a) + .andThen(((FluxToMonoFunction) b).getTarget())); + } + else { + return function1.andThen(function2); + } + } + else if (a instanceof Function && b instanceof Consumer) { + Function function = (Function) a; + Consumer consumer = (Consumer) b; + return (Consumer) v -> consumer.accept(function.apply(v)); + } + else { + throw new IllegalArgumentException( + String.format("Could not compose %s and %s", a.getClass(), b.getClass())); + } + } + + @SuppressWarnings("unchecked") + private T doLookup(Class type, String name) { + T function = null; + if (type == null) { + function = (T) this.lookupFunction(name); + if (function == null) { + function = (T) this.lookupConsumer(name); + } + if (function == null) { + function = (T) this.lookupSupplier(name); + } + } + else if (Function.class.isAssignableFrom(type)) { + function = (T) this.lookupFunction(name); + } + else if (Supplier.class.isAssignableFrom(type)) { + function = (T) this.lookupSupplier(name); + } + else if (Consumer.class.isAssignableFrom(type)) { + function = (T) this.lookupConsumer(name); + } + return function; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java index 8c9e12c9e..cf6a69fa3 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalog.java @@ -23,12 +23,7 @@ import java.util.Set; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; - -import org.springframework.cloud.function.context.AbstractFunctionRegistry; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.util.Assert; @@ -37,9 +32,7 @@ import org.springframework.util.Assert; * @author Mark Fisher * @author Oleg Zhurakousky */ -public class InMemoryFunctionCatalog extends AbstractFunctionRegistry { - - private final Map, Map> functions; +public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry { private final Map> registrations; @@ -49,7 +42,6 @@ public class InMemoryFunctionCatalog extends AbstractFunctionRegistry { public InMemoryFunctionCatalog(Set> registrations) { Assert.notNull(registrations, "'registrations' must not be null"); - this.functions = new HashMap<>(); this.registrations = new HashMap<>(); registrations.stream().forEach(reg -> register(reg)); } @@ -85,64 +77,13 @@ public class InMemoryFunctionCatalog extends AbstractFunctionRegistry { type = Function.class; } } - Map map = this.functions.computeIfAbsent(type, - key -> new HashMap<>()); + for (String name : registration.getNames()) { - map.put(name, registration.getTarget()); + this.addFunction(name, registration.getTarget()); } this.publishEvent(event); } - @PostConstruct - public void init() { - if (this.applicationEventPublisher != null && !this.functions.isEmpty()) { - this.functions.keySet() - .forEach(type -> this.publishEvent(new FunctionRegistrationEvent(this, - type, this.functions.get(type).keySet()))); - } - } - - @PreDestroy - public void close() { - if (this.applicationEventPublisher != null && !this.functions.isEmpty()) { - this.functions.keySet().forEach( - type -> this.publishEvent(new FunctionUnregistrationEvent(this, type, - this.functions.get(type).keySet()))); - } - } - - @Override - @SuppressWarnings("unchecked") - public T doLookup(Class type, String name) { - T function = null; - if (type == null) { - function = (T) this.functions.values().stream() - .filter(map -> map.get(name) != null).map(map -> map.get(name)) - .findFirst().orElse(null); - } - else { - function = (T) this.extractTypeMap(type).get(name); - } - return function; - } - - @Override - public Set getNames(Class type) { - if (type == null) { - return this.functions.values().stream().flatMap(map -> map.keySet().stream()) - .collect(Collectors.toSet()); - } - Map map = this.extractTypeMap(type); - return map == null ? Collections.emptySet() : map.keySet(); - } - - private Map extractTypeMap(Class type) { - return this.functions.keySet().stream() - .filter(key -> key != Object.class && key.isAssignableFrom(type)) - .map(key -> this.functions.get(key)).findFirst() - .orElse(this.functions.get(Object.class)); - } - private void publishEvent(Object event) { if (this.applicationEventPublisher != null) { this.applicationEventPublisher.publishEvent(event); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java index dab625ac1..fe1c34c0a 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ContextFunctionCatalogAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2016-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,26 +18,20 @@ package org.springframework.cloud.function.context.config; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; -import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.PreDestroy; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; @@ -51,20 +45,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.cloud.function.context.AbstractFunctionRegistry; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; -import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent; +import org.springframework.cloud.function.context.catalog.AbstractComposableFunctionRegistry; import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent; -import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxSupplier; -import org.springframework.cloud.function.core.FluxToMonoFunction; -import org.springframework.cloud.function.core.IsolatedConsumer; -import org.springframework.cloud.function.core.IsolatedFunction; -import org.springframework.cloud.function.core.IsolatedSupplier; -import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.cloud.function.json.GsonMapper; import org.springframework.cloud.function.json.JacksonMapper; import org.springframework.context.ApplicationEventPublisher; @@ -76,7 +62,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.FilterType; import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.core.type.StandardMethodMetadata; -import org.springframework.stereotype.Component; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -88,103 +73,44 @@ import org.springframework.util.StringUtils; */ @Configuration @ConditionalOnMissingBean(FunctionCatalog.class) -// @checkstyle:off -@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = { - Supplier.class, Function.class, Consumer.class })) -// @checkstyle:on +@ComponentScan( + basePackages = "${spring.cloud.function.scan.packages:functions}", + includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, + classes = { + Supplier.class, + Function.class, + Consumer.class })) public class ContextFunctionCatalogAutoConfiguration { static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper"; @Bean - public FunctionRegistry functionCatalog(ContextFunctionRegistry processor) { - return new BeanFactoryFunctionCatalog(processor); + public FunctionRegistry functionCatalog() { + return new BeanFactoryFunctionCatalog(); } - protected static class BeanFactoryFunctionCatalog extends AbstractFunctionRegistry { - - private final ContextFunctionRegistry processor; - - public BeanFactoryFunctionCatalog(ContextFunctionRegistry processor) { - this.processor = processor; - } - - @Override - public FunctionRegistration getRegistration(Object function) { - return function == null ? null : this.processor.getRegistration(function); - } - - @Override - public void register(FunctionRegistration registration) { - Assert.notEmpty(registration.getNames(), - "'registration' must contain at least one name before it is registered in catalog."); - this.processor.register(registration); - } - - @Override - public Set getNames(Class type) { - if (Supplier.class.isAssignableFrom(type)) { - return this.processor.suppliers.keySet(); - } - if (Consumer.class.isAssignableFrom(type)) { - return this.processor.consumers.keySet(); - } - if (Function.class.isAssignableFrom(type)) { - return this.processor.functions.keySet(); - } - return Collections.emptySet(); - } - - @Override - public int size() { - return this.processor.suppliers.size() + this.processor.functions.size() - + this.processor.consumers.size(); - } - - @Override - @SuppressWarnings("unchecked") - protected T doLookup(Class type, String name) { - T function = null; - if (type == null) { - function = (T) this.processor.lookupFunction(name); - if (function == null) { - function = (T) this.processor.lookupConsumer(name); - } - if (function == null) { - function = (T) this.processor.lookupSupplier(name); - } - } - else if (Function.class.isAssignableFrom(type)) { - function = (T) this.processor.lookupFunction(name); - } - else if (Supplier.class.isAssignableFrom(type)) { - function = (T) this.processor.lookupSupplier(name); - } - else if (Consumer.class.isAssignableFrom(type)) { - function = (T) this.processor.lookupConsumer(name); - } - return function; - } - - } - - @Component - protected static class ContextFunctionRegistry - implements InitializingBean, BeanFactoryAware { + protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry + implements InitializingBean, BeanFactoryAware { private ApplicationEventPublisher applicationEventPublisher; private ConfigurableListableBeanFactory beanFactory; - private Map suppliers = new ConcurrentHashMap<>(); + @Override + public FunctionRegistration getRegistration(Object function) { + String functionName = this.lookupFunctionName(function); + if (StringUtils.hasText(functionName)) { + return new FunctionRegistration<>(function, functionName) + .type(findType(function).getType()); + } + return null; + } - private Map functions = new ConcurrentHashMap<>(); - - private Map consumers = new ConcurrentHashMap<>(); - - private Map names = new ConcurrentHashMap<>(); - - private Map types = new ConcurrentHashMap<>(); + public void register(FunctionRegistration functionRegistration) { + Assert.notEmpty(functionRegistration.getNames(), + "'registration' must contain at least one name before it is registered in catalog."); + wrap(functionRegistration, functionRegistration.getNames().iterator().next()); + } /** * Will collect all suppliers, functions, consumers and function registration as @@ -213,46 +139,36 @@ public class ContextFunctionCatalogAutoConfiguration { @PreDestroy public void close() { if (this.applicationEventPublisher != null) { - if (!this.functions.isEmpty()) { + if (this.hasFunctions()) { this.applicationEventPublisher .publishEvent(new FunctionUnregistrationEvent(this, - Function.class, this.functions.keySet())); + Function.class, this.getFunctionNames())); } - if (!this.consumers.isEmpty()) { + if (this.hasConsumers()) { this.applicationEventPublisher .publishEvent(new FunctionUnregistrationEvent(this, - Consumer.class, this.consumers.keySet())); + Consumer.class, this.getConsumerNames())); } - if (!this.suppliers.isEmpty()) { + if (this.hasSuppliers()) { this.applicationEventPublisher .publishEvent(new FunctionUnregistrationEvent(this, - Supplier.class, this.suppliers.keySet())); + Supplier.class, this.getSupplierNames())); } } } - void register(FunctionRegistration function) { - wrap(function, function.getNames().iterator().next()); - } + @Override + protected FunctionType findType(Object function) { + String name = this.lookupFunctionName(function); + FunctionType functionType = this.getFunctionType(name); - FunctionRegistration getRegistration(Object function) { - if (names.containsKey(function)) { - return new FunctionRegistration<>(function, this.names.get(function)) - .type(findType(function).getType()); + if (functionType == null) { + functionType = functionByNameExist(name) + ? new FunctionType(function.getClass()) : new FunctionType( + FunctionContextUtils.findType(name, this.beanFactory)); } - return null; - } - Supplier lookupSupplier(String name) { - return (Supplier) lookup(name, this.suppliers, Supplier.class); - } - - Function lookupFunction(String name) { - return (Function) lookup(name, this.functions, Function.class); - } - - Consumer lookupConsumer(String name) { - return (Consumer) lookup(name, this.consumers, Consumer.class); + return functionType; } // @checkstyle:off @@ -269,135 +185,6 @@ public class ContextFunctionCatalogAutoConfiguration { } // @checkstyle:on - @SuppressWarnings("unchecked") - private Object lookup(String name, @SuppressWarnings("rawtypes") Map lookup, - Class typeOfFunction) { - Object function = compose(name, lookup); - if (function != null - && typeOfFunction.isAssignableFrom(function.getClass())) { - return function; - } - return null; - } - - private String normalizeName(String name) { - return name.replaceAll(",", "|").trim(); - } - - private Object compose(String name, Map lookup) { - name = normalizeName(name); - Object composedFunction = null; - if (lookup.containsKey(name)) { - composedFunction = lookup.get(name); - } - else { - if (name.equals("") && lookup.size() == 1) { - composedFunction = lookup.values().iterator().next(); - } - else { - String[] stages = StringUtils.delimitedListToStringArray(name, "|"); - if (Stream.of(stages).allMatch(funcName -> contains(funcName))) { - List composableFunctions = Stream.of(stages) - .map(funcName -> find(funcName)) - .collect(Collectors.toList()); - composedFunction = composableFunctions.stream() - .reduce((a, z) -> composeFunctions(a, z)) - .orElseGet(() -> null); - if (composedFunction != null && !this.types.containsKey(name) - && this.types.containsKey(stages[0]) - && this.types.containsKey(stages[stages.length - 1])) { - FunctionType input = this.types.get(stages[0]); - FunctionType output = this.types - .get(stages[stages.length - 1]); - this.types.put(name, FunctionType.compose(input, output)); - this.names.put(composedFunction, name); - if (composedFunction instanceof Function) { - this.functions.put(name, composedFunction); - } - else if (composedFunction instanceof Consumer) { - this.consumers.put(name, composedFunction); - } - else if (composedFunction instanceof Supplier) { - this.suppliers.put(name, composedFunction); - } - } - } - } - } - return composedFunction; - } - - private boolean contains(String name) { - return suppliers.containsKey(name) || functions.containsKey(name) - || consumers.containsKey(name); - } - - private Object find(String name) { - Object result = suppliers.get(name); - if (result == null) { - result = functions.get(name); - } - if (result == null) { - result = consumers.get(name); - } - return result; - } - - @SuppressWarnings("unchecked") - private Object composeFunctions(Object a, Object b) { - if (a instanceof Supplier && b instanceof Function) { - Supplier> supplier = (Supplier>) a; - if (b instanceof FluxConsumer) { - if (supplier instanceof FluxSupplier) { - FluxConsumer fConsumer = ((FluxConsumer) b); - return (Supplier>) () -> Mono.from(supplier.get() - .compose(v -> fConsumer.apply(supplier.get()))); - } - else { - throw new IllegalStateException( - "The provided supplier is finite (i.e., already composed with Consumer) " - + "therefore it can not be composed with another consumer"); - } - } - else { - Function function = (Function) b; - return (Supplier) () -> function.apply(supplier.get()); - } - } - else if (a instanceof Function && b instanceof Function) { - Function function1 = (Function) a; - Function function2 = (Function) b; - if (function1 instanceof FluxToMonoFunction) { - if (function2 instanceof MonoToFluxFunction) { - return function1.andThen(function2); - } - else { - throw new IllegalStateException( - "The provided function is finite (i.e., returns Mono) " - + "therefore it can *only* be composed with compatible function (i.e., Function"); - } - } - else if (function2 instanceof FluxToMonoFunction) { - return new FluxToMonoFunction( - ((Function, Flux>) a) - .andThen(((FluxToMonoFunction) b) - .getTarget())); - } - else { - return function1.andThen(function2); - } - } - else if (a instanceof Function && b instanceof Consumer) { - Function function = (Function) a; - Consumer consumer = (Consumer) b; - return (Consumer) v -> consumer.accept(function.apply(v)); - } - else { - throw new IllegalArgumentException(String.format( - "Could not compose %s and %s", a.getClass(), b.getClass())); - } - } - private Collection getAliases(String key) { Collection names = new LinkedHashSet<>(); String value = getQualifier(key); @@ -408,67 +195,67 @@ public class ContextFunctionCatalogAutoConfiguration { return names; } - private void wrap(FunctionRegistration registration, String key) { - Object target = registration.getTarget(); - this.names.put(target, key); - if (registration.getType() != null) { - this.types.put(key, registration.getType()); - } - else { - registration.type(findType(target).getType()); - } - Class type; - registration = isolated(registration).wrap(); - target = registration.getTarget(); - if (target instanceof Supplier) { - type = Supplier.class; - for (String name : registration.getNames()) { - this.suppliers.put(name, registration.getTarget()); - } - } - else if (target instanceof Consumer) { - type = Consumer.class; - for (String name : registration.getNames()) { - this.consumers.put(name, registration.getTarget()); - } - } - else if (target instanceof Function) { - type = Function.class; - for (String name : registration.getNames()) { - this.functions.put(name, registration.getTarget()); - } - } - else { - return; - } - this.names.put(registration.getTarget(), key); - if (this.applicationEventPublisher != null) { - this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent( - registration.getTarget(), type, registration.getNames())); - } - } +// private void wrap(FunctionRegistration registration, String key) { +// Object target = registration.getTarget(); +//// this.addName(target, key); +// if (registration.getType() != null) { +// this.addType(key, registration.getType()); +// } +// else { +// registration.type(findType(target).getType()); +// } +// Class type; +// registration = isolated(registration).wrap(); +// target = registration.getTarget(); +// if (target instanceof Supplier) { +// type = Supplier.class; +// for (String name : registration.getNames()) { +// this.addSupplier(name, registration.getTarget()); +// } +// } +// else if (target instanceof Consumer) { +// type = Consumer.class; +// for (String name : registration.getNames()) { +// this.addConsumer(name, registration.getTarget()); +// } +// } +// else if (target instanceof Function) { +// type = Function.class; +// for (String name : registration.getNames()) { +// this.addFunction(name, registration.getTarget()); +// } +// } +// else { +// return; +// } +// //this.addName(registration.getTarget(), key); +// if (this.applicationEventPublisher != null) { +// this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent( +// registration.getTarget(), type, registration.getNames())); +// } +// } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private FunctionRegistration isolated(FunctionRegistration input) { - FunctionRegistration registration = (FunctionRegistration) input; - Object target = registration.getTarget(); - boolean isolated = getClass().getClassLoader() != target.getClass() - .getClassLoader(); - if (isolated) { - if (target instanceof Supplier && isolated) { - target = new IsolatedSupplier((Supplier) target); - } - else if (target instanceof Function) { - target = new IsolatedFunction((Function) target); - } - else if (target instanceof Consumer) { - target = new IsolatedConsumer((Consumer) target); - } - } - - registration.target(target); - return registration; - } +// @SuppressWarnings({ "rawtypes", "unchecked" }) +// private FunctionRegistration isolated(FunctionRegistration input) { +// FunctionRegistration registration = (FunctionRegistration) input; +// Object target = registration.getTarget(); +// boolean isolated = getClass().getClassLoader() != target.getClass() +// .getClassLoader(); +// if (isolated) { +// if (target instanceof Supplier && isolated) { +// target = new IsolatedSupplier((Supplier) target); +// } +// else if (target instanceof Function) { +// target = new IsolatedFunction((Function) target); +// } +// else if (target instanceof Consumer) { +// target = new IsolatedConsumer((Consumer) target); +// } +// } +// +// registration.target(target); +// return registration; +// } private String getQualifier(String key) { if (this.beanFactory != null @@ -487,22 +274,6 @@ public class ContextFunctionCatalogAutoConfiguration { return key; } - private FunctionType findType(Object function) { - String name = this.names.get(function); - FunctionType functionType; - if (this.types.containsKey(name)) { - functionType = this.types.get(name); - } - else { - functionType = functionByNameExist(name) - ? new FunctionType(function.getClass()) : new FunctionType( - FunctionContextUtils.findType(name, this.beanFactory)); - this.types.computeIfAbsent(name, str -> functionType); - } - - return functionType; - } - private boolean functionByNameExist(String name) { return name == null || this.beanFactory == null || !this.beanFactory.containsBeanDefinition(name); @@ -540,7 +311,6 @@ public class ContextFunctionCatalogAutoConfiguration { registrations.forEach(registration -> wrap(registration, targets.get(registration.getTarget()))); } - } private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition { @@ -577,9 +347,10 @@ public class ContextFunctionCatalogAutoConfiguration { @Configuration @ConditionalOnClass(ObjectMapper.class) @ConditionalOnBean(ObjectMapper.class) - // @checkstyle:off - @ConditionalOnProperty(name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, havingValue = "jackson", matchIfMissing = true) - // @checkstyle:on + @ConditionalOnProperty( + name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, + havingValue = "jackson", + matchIfMissing = true) protected static class JacksonConfiguration { @Bean diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java index 38e217abd..79f2f8887 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/InMemoryFunctionCatalogTests.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.catalog; import java.util.function.Function; import org.junit.Test; +import reactor.core.publisher.Flux; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; @@ -59,6 +60,40 @@ public class InMemoryFunctionCatalogTests { assertThat(lookedUpFunction instanceof FluxFunction).isTrue(); } + @Test + public void testFunctionComposition() { + FunctionRegistration upperCaseRegistration = new FunctionRegistration<>( + new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class).getType()); + FunctionRegistration reverseRegistration = new FunctionRegistration<>( + new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType()); + InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog(); + catalog.register(upperCaseRegistration); + catalog.register(reverseRegistration); + + Function, Flux> lookedUpFunction = catalog.lookup("uppercase|reverse"); + + assertThat(lookedUpFunction).isNotNull(); + assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()).isEqualTo("RATS"); + } + + private static class UpperCase implements Function { + + @Override + public String apply(String t) { + return t.toUpperCase(); + } + + } + + private static class Reverse implements Function { + + @Override + public String apply(String t) { + return new StringBuilder(t).reverse().toString(); + } + + } + private static class TestFunction implements Function { @Override diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java index e8572ee1a..eab9473fe 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/BeanFactoryFunctionCatalogTests.java @@ -32,18 +32,15 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog; -import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry; import static org.assertj.core.api.Assertions.assertThat; - /** * @author Dave Syer * */ public class BeanFactoryFunctionCatalogTests { - private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog( - new ContextFunctionRegistry()); + private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog(); @Test public void basicRegistrationFeatures() { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java index c63ca0b5f..b7688bcfd 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/ContextFunctionPostProcessorTests.java @@ -35,7 +35,7 @@ import reactor.core.publisher.Mono; import org.springframework.beans.BeanUtils; import org.springframework.cloud.function.context.FunctionRegistration; -import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry; +import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog; import org.springframework.test.util.ReflectionTestUtils; import org.springframework.util.ClassUtils; @@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThat; // for functions with the same name, uncomposable combinations) public class ContextFunctionPostProcessorTests { - private ContextFunctionRegistry processor = new ContextFunctionRegistry(); + private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog(); private URLClassLoader classLoader; @@ -70,10 +70,11 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(new Foos(), "foos")); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos"); + .lookup(null, "foos"); //lookupFunction("foos"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); } + @SuppressWarnings("deprecation") @Test public void registrationThroughMerge() { FunctionRegistration registration = new FunctionRegistration<>(new Foos(), @@ -82,17 +83,18 @@ public class ContextFunctionPostProcessorTests { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos"); + .lookup(null, "foos"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); } + @SuppressWarnings("deprecation") @Test public void registrationThroughMergeFromNamedFunction() { this.processor.merge(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.singletonMap("foos", new Foos())); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos"); + .lookup(null, "foos"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); } @@ -102,7 +104,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(new Bars(), "bars")); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos,bars"); + .lookup(null, "foos,bars"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4"); assertThat(this.processor.getRegistration(foos).getNames()) .containsExactly("foos|bars"); @@ -116,7 +118,7 @@ public class ContextFunctionPostProcessorTests { (x) -> x.toUpperCase(), "function")); @SuppressWarnings("unchecked") Supplier> supplier = (Supplier>) this.processor - .lookupSupplier("supplier|function"); + .lookup(Supplier.class, "supplier|function"); assertThat(supplier.get().blockFirst()).isEqualTo("FOO"); assertThat(this.processor.getRegistration(supplier).getNames()) .containsExactly("supplier|function"); @@ -130,7 +132,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration>( System.out::println, "consumer")); Supplier> supplier = (Supplier>) this.processor - .lookupSupplier("supplier|consumer"); + .lookup(Supplier.class, "supplier|consumer"); assertThat(supplier.get().block()).isNull(); } @@ -140,7 +142,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(new Bars(), "bars")); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos|bars"); + .lookup(null, "foos|bars"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4"); assertThat(this.processor.getRegistration(foos).getNames()) .containsExactly("foos|bars"); @@ -152,7 +154,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(new Foos(), "foos")); @SuppressWarnings("unchecked") Supplier> foos = (Supplier>) this.processor - .lookupSupplier("ints|foos"); + .lookup(Supplier.class, "ints|foos"); assertThat(foos.get().blockFirst()).isEqualTo("8"); assertThat(this.processor.getRegistration(foos).getNames()) .containsExactly("ints|foos"); @@ -167,7 +169,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(create(Foos.class), "foos")); @SuppressWarnings("unchecked") Function, Flux> foos = (Function, Flux>) this.processor - .lookupFunction("foos"); + .lookup(null, "foos"); assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4"); } @@ -179,7 +181,7 @@ public class ContextFunctionPostProcessorTests { .register(new FunctionRegistration<>(create(Source.class), "source")); @SuppressWarnings("unchecked") Supplier> source = (Supplier>) this.processor - .lookupSupplier("source"); + .lookup(Supplier.class, "source"); assertThat(source.get().blockFirst()).isEqualTo(4); } @@ -191,7 +193,7 @@ public class ContextFunctionPostProcessorTests { this.processor.register(new FunctionRegistration<>(target, "sink")); @SuppressWarnings("unchecked") Function, Mono> sink = (Function, Mono>) this.processor - .lookupFunction("sink"); + .lookup(null, "sink"); sink.apply(Flux.just("Hello")).subscribe(); @SuppressWarnings("unchecked") List values = (List) ReflectionTestUtils.getField(target,