GH-261 Added support for function composition InMemoryFunctionCatalog

- Refactored InMemoryFunctionCatalog and BeanFactoryFunctionCatalog into common catalog implementation
- Added initial test

Resolves #261
This commit is contained in:
Oleg Zhurakousky
2019-02-18 16:31:06 +01:00
parent 217ca065f3
commit 0d7e0cc57c
7 changed files with 577 additions and 470 deletions

View File

@@ -1,56 +0,0 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
/**
* @author Oleg Zhurakousky
* @since 2.0.1
*
*/
public abstract class AbstractFunctionRegistry
implements FunctionRegistry, FunctionInspector, ApplicationEventPublisherAware {
@Autowired
private Environment environment = new StandardEnvironment();
protected ApplicationEventPublisher applicationEventPublisher;
public <T> T lookup(Class<?> type, String name) {
String functionDefinitionName = !StringUtils.hasText(name)
&& this.environment.containsProperty("spring.cloud.function.definition")
? this.environment.getProperty("spring.cloud.function.definition")
: name;
return this.doLookup(type, functionDefinitionName);
}
protected abstract <T> T doLookup(Class<?> type, String name);
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}

View File

@@ -0,0 +1,417 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
import org.springframework.cloud.function.core.IsolatedSupplier;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
/**
* Base implementation of {@link FunctionRegistry} which supports
* function composition during lookups. For example if this registry contains
* function 'a' and 'b' you can compose them into a single function by
* simply piping two names together during the
* lookup {@code this.lookup(Function.class, "a|b")}.
*
* Comma ',' is also supported as composition delimiter (e.g., {@code "a,b"}).
*
* @author Oleg Zhurakousky
* @since 2.1
*
*/
public abstract class AbstractComposableFunctionRegistry
implements FunctionRegistry, FunctionInspector,
ApplicationEventPublisherAware, EnvironmentAware {
private Map<String, Object> suppliers = new ConcurrentHashMap<>();
private Map<String, Object> functions = new ConcurrentHashMap<>();
private Map<String, Object> consumers = new ConcurrentHashMap<>();
private Map<Object, String> names = new ConcurrentHashMap<>();
private Map<String, FunctionType> types = new ConcurrentHashMap<>();
private Environment environment = new StandardEnvironment();
protected ApplicationEventPublisher applicationEventPublisher;
@Override
public <T> T lookup(Class<?> type, String name) {
String functionDefinitionName = !StringUtils.hasText(name)
&& this.environment.containsProperty("spring.cloud.function.definition")
? this.environment.getProperty("spring.cloud.function.definition")
: name;
return this.doLookup(type, functionDefinitionName);
}
@Override
public void setApplicationEventPublisher(
ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public void setEnvironment(Environment environment) {
this.environment = environment;
}
@Override
public Set<String> getNames(Class<?> type) {
if (type == null) { //perhaps some synchronization
Set<String> names = new HashSet<>(suppliers.keySet());
names.addAll(functions.keySet());
names.addAll(consumers.keySet());
return names;
}
if (Supplier.class.isAssignableFrom(type)) {
return this.getSupplierNames();
}
if (Consumer.class.isAssignableFrom(type)) {
return this.getConsumerNames();
}
if (Function.class.isAssignableFrom(type)) {
return this.getFunctionNames();
}
return Collections.emptySet();
}
public Set<String> getSupplierNames() {
return this.suppliers.keySet();
}
public Set<String> getFunctionNames() {
return this.functions.keySet();
}
public Set<String> getConsumerNames() {
return this.consumers.keySet();
}
public boolean hasSuppliers() {
return !CollectionUtils.isEmpty(this.suppliers);
}
public boolean hasFunctions() {
return !CollectionUtils.isEmpty(this.functions);
}
public boolean hasConsumers() {
return !CollectionUtils.isEmpty(this.consumers);
}
/**
* The count of all Suppliers, Function and Consumers currently registered.
* @return the count of all Suppliers, Function and Consumers currently registered.
*/
public int size() {
return this.suppliers.size() + this.functions.size() + this.consumers.size();
}
public FunctionType getFunctionType(String name) {
return this.types.get(name);
}
/**
* A reverse lookup where one can determine the actual name of
* the function reference.
* @param function should be an instance of {@link Supplier},
* {@link Function} or {@link Consumer};
* @return the name of the function or null.
*/
public String lookupFunctionName(Object function) {
return this.names.containsKey(function) ? this.names.get(function) : null;
}
public void addSupplier(String name, Object supplier) {
this.suppliers.put(name, supplier);
this.addName(supplier, name);
}
public void addFunction(String name, Object function) {
this.functions.put(name, function);
this.addName(function, name);
}
public void addConsumer(String name, Object consumer) {
this.consumers.put(name, consumer);
this.addName(consumer, name);
}
protected void wrap(FunctionRegistration<?> registration, String key) {
Object target = registration.getTarget();
this.addName(target, key);
if (registration.getType() != null) {
this.addType(key, registration.getType());
}
else {
FunctionType functionType = findType(target);
this.addType(key, functionType);
registration.type(functionType.getType());
}
Class<?> type;
registration = isolated(registration).wrap();
target = registration.getTarget();
if (target instanceof Supplier) {
type = Supplier.class;
for (String name : registration.getNames()) {
this.addSupplier(name, registration.getTarget());
}
}
else if (target instanceof Consumer) {
type = Consumer.class;
for (String name : registration.getNames()) {
this.addConsumer(name, registration.getTarget());
}
}
else if (target instanceof Function) {
type = Function.class;
for (String name : registration.getNames()) {
this.addFunction(name, registration.getTarget());
}
}
else {
return;
}
this.addName(registration.getTarget(), key);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent(
registration.getTarget(), type, registration.getNames()));
}
}
protected FunctionType findType(Object function) {
throw new UnsupportedOperationException("There is no default "
+ "implementation of this operation. It must be overriden "
+ "by the implementation of FunctionRegistry.");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private Object doLookup(String name, Map lookup, Class<?> typeOfFunction) {
Object function = compose(name, lookup);
if (function != null && typeOfFunction.isAssignableFrom(function.getClass())) {
return function;
}
return null;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration<?> isolated(FunctionRegistration<?> input) {
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
Object target = registration.getTarget();
boolean isolated = getClass().getClassLoader() != target.getClass()
.getClassLoader();
if (isolated) {
if (target instanceof Supplier<?> && isolated) {
target = new IsolatedSupplier((Supplier<?>) target);
}
else if (target instanceof Function<?, ?>) {
target = new IsolatedFunction((Function<?, ?>) target);
}
else if (target instanceof Consumer<?>) {
target = new IsolatedConsumer((Consumer<?>) target);
}
}
registration.target(target);
return registration;
}
private void addType(String name, FunctionType functionType) {
this.types.computeIfAbsent(name, str -> functionType);
}
private void addName(Object function, String name) {
this.names.put(function, name);
}
private Supplier<?> lookupSupplier(String name) {
return (Supplier<?>) doLookup(name, this.suppliers, Supplier.class);
}
private Function<?, ?> lookupFunction(String name) {
return (Function<?, ?>) doLookup(name, this.functions, Function.class);
}
private Consumer<?> lookupConsumer(String name) {
return (Consumer<?>) doLookup(name, this.consumers, Consumer.class);
}
private Object compose(String name, Map<String, Object> lookup) {
name = normalizeName(name);
Object composedFunction = null;
if (lookup.containsKey(name)) {
composedFunction = lookup.get(name);
}
else {
if (name.equals("") && lookup.size() == 1) {
composedFunction = lookup.values().iterator().next();
}
else {
String[] stages = StringUtils.delimitedListToStringArray(name, "|");
if (Stream.of(stages).allMatch(funcName -> contains(funcName))) {
List<Object> composableFunctions = Stream.of(stages).map(funcName -> find(funcName))
.collect(Collectors.toList());
composedFunction = composableFunctions.stream().reduce((a, z) -> composeFunctions(a, z))
.orElseGet(() -> null);
if (composedFunction != null && !this.types.containsKey(name) && this.types.containsKey(stages[0])
&& this.types.containsKey(stages[stages.length - 1])) {
FunctionType input = this.types.get(stages[0]);
FunctionType output = this.types.get(stages[stages.length - 1]);
this.types.put(name, FunctionType.compose(input, output));
this.names.put(composedFunction, name);
if (composedFunction instanceof Function) {
this.functions.put(name, composedFunction);
}
else if (composedFunction instanceof Consumer) {
this.consumers.put(name, composedFunction);
}
else if (composedFunction instanceof Supplier) {
this.suppliers.put(name, composedFunction);
}
}
}
}
}
return composedFunction;
}
private String normalizeName(String name) {
return name.replaceAll(",", "|").trim();
}
private boolean contains(String name) {
return suppliers.containsKey(name) || functions.containsKey(name) || consumers.containsKey(name);
}
private Object find(String name) {
Object result = suppliers.get(name);
if (result == null) {
result = functions.get(name);
}
if (result == null) {
result = consumers.get(name);
}
return result;
}
@SuppressWarnings("unchecked")
private Object composeFunctions(Object a, Object b) {
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
return (Supplier<Mono<Void>>) () -> Mono
.from(supplier.get().compose(v -> fConsumer.apply(supplier.get())));
}
else {
throw new IllegalStateException(
"The provided supplier is finite (i.e., already composed with Consumer) "
+ "therefore it can not be composed with another consumer");
}
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
return (Supplier<Object>) () -> function.apply(supplier.get());
}
}
else if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;
Function<Object, Object> function2 = (Function<Object, Object>) b;
if (function1 instanceof FluxToMonoFunction) {
if (function2 instanceof MonoToFluxFunction) {
return function1.andThen(function2);
}
else {
throw new IllegalStateException("The provided function is finite (i.e., returns Mono<?>) "
+ "therefore it can *only* be composed with compatible function (i.e., Function<Mono, Flux>");
}
}
else if (function2 instanceof FluxToMonoFunction) {
return new FluxToMonoFunction<Object, Object>(((Function<Flux<Object>, Flux<Object>>) a)
.andThen(((FluxToMonoFunction<Object, Object>) b).getTarget()));
}
else {
return function1.andThen(function2);
}
}
else if (a instanceof Function && b instanceof Consumer) {
Function<Object, Object> function = (Function<Object, Object>) a;
Consumer<Object> consumer = (Consumer<Object>) b;
return (Consumer<Object>) v -> consumer.accept(function.apply(v));
}
else {
throw new IllegalArgumentException(
String.format("Could not compose %s and %s", a.getClass(), b.getClass()));
}
}
@SuppressWarnings("unchecked")
private <T> T doLookup(Class<?> type, String name) {
T function = null;
if (type == null) {
function = (T) this.lookupFunction(name);
if (function == null) {
function = (T) this.lookupConsumer(name);
}
if (function == null) {
function = (T) this.lookupSupplier(name);
}
}
else if (Function.class.isAssignableFrom(type)) {
function = (T) this.lookupFunction(name);
}
else if (Supplier.class.isAssignableFrom(type)) {
function = (T) this.lookupSupplier(name);
}
else if (Consumer.class.isAssignableFrom(type)) {
function = (T) this.lookupConsumer(name);
}
return function;
}
}

View File

@@ -23,12 +23,7 @@ import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.cloud.function.context.AbstractFunctionRegistry;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.util.Assert;
@@ -37,9 +32,7 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @author Oleg Zhurakousky
*/
public class InMemoryFunctionCatalog extends AbstractFunctionRegistry {
private final Map<Class<?>, Map<String, Object>> functions;
public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry {
private final Map<Object, FunctionRegistration<?>> registrations;
@@ -49,7 +42,6 @@ public class InMemoryFunctionCatalog extends AbstractFunctionRegistry {
public InMemoryFunctionCatalog(Set<FunctionRegistration<?>> registrations) {
Assert.notNull(registrations, "'registrations' must not be null");
this.functions = new HashMap<>();
this.registrations = new HashMap<>();
registrations.stream().forEach(reg -> register(reg));
}
@@ -85,64 +77,13 @@ public class InMemoryFunctionCatalog extends AbstractFunctionRegistry {
type = Function.class;
}
}
Map<String, Object> map = this.functions.computeIfAbsent(type,
key -> new HashMap<>());
for (String name : registration.getNames()) {
map.put(name, registration.getTarget());
this.addFunction(name, registration.getTarget());
}
this.publishEvent(event);
}
@PostConstruct
public void init() {
if (this.applicationEventPublisher != null && !this.functions.isEmpty()) {
this.functions.keySet()
.forEach(type -> this.publishEvent(new FunctionRegistrationEvent(this,
type, this.functions.get(type).keySet())));
}
}
@PreDestroy
public void close() {
if (this.applicationEventPublisher != null && !this.functions.isEmpty()) {
this.functions.keySet().forEach(
type -> this.publishEvent(new FunctionUnregistrationEvent(this, type,
this.functions.get(type).keySet())));
}
}
@Override
@SuppressWarnings("unchecked")
public <T> T doLookup(Class<?> type, String name) {
T function = null;
if (type == null) {
function = (T) this.functions.values().stream()
.filter(map -> map.get(name) != null).map(map -> map.get(name))
.findFirst().orElse(null);
}
else {
function = (T) this.extractTypeMap(type).get(name);
}
return function;
}
@Override
public Set<String> getNames(Class<?> type) {
if (type == null) {
return this.functions.values().stream().flatMap(map -> map.keySet().stream())
.collect(Collectors.toSet());
}
Map<String, Object> map = this.extractTypeMap(type);
return map == null ? Collections.emptySet() : map.keySet();
}
private Map<String, Object> extractTypeMap(Class<?> type) {
return this.functions.keySet().stream()
.filter(key -> key != Object.class && key.isAssignableFrom(type))
.map(key -> this.functions.get(key)).findFirst()
.orElse(this.functions.get(Object.class));
}
private void publishEvent(Object event) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2016-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,26 +18,20 @@ package org.springframework.cloud.function.context.config;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
@@ -51,20 +45,12 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.function.context.AbstractFunctionRegistry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionRegistrationEvent;
import org.springframework.cloud.function.context.catalog.AbstractComposableFunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionUnregistrationEvent;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
import org.springframework.cloud.function.core.IsolatedSupplier;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.context.ApplicationEventPublisher;
@@ -76,7 +62,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
@@ -88,103 +73,44 @@ import org.springframework.util.StringUtils;
*/
@Configuration
@ConditionalOnMissingBean(FunctionCatalog.class)
// @checkstyle:off
@ComponentScan(basePackages = "${spring.cloud.function.scan.packages:functions}", includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE, classes = {
Supplier.class, Function.class, Consumer.class }))
// @checkstyle:on
@ComponentScan(
basePackages = "${spring.cloud.function.scan.packages:functions}",
includeFilters = @Filter(type = FilterType.ASSIGNABLE_TYPE,
classes = {
Supplier.class,
Function.class,
Consumer.class }))
public class ContextFunctionCatalogAutoConfiguration {
static final String PREFERRED_MAPPER_PROPERTY = "spring.http.converters.preferred-json-mapper";
@Bean
public FunctionRegistry functionCatalog(ContextFunctionRegistry processor) {
return new BeanFactoryFunctionCatalog(processor);
public FunctionRegistry functionCatalog() {
return new BeanFactoryFunctionCatalog();
}
protected static class BeanFactoryFunctionCatalog extends AbstractFunctionRegistry {
private final ContextFunctionRegistry processor;
public BeanFactoryFunctionCatalog(ContextFunctionRegistry processor) {
this.processor = processor;
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
return function == null ? null : this.processor.getRegistration(function);
}
@Override
public <T> void register(FunctionRegistration<T> registration) {
Assert.notEmpty(registration.getNames(),
"'registration' must contain at least one name before it is registered in catalog.");
this.processor.register(registration);
}
@Override
public Set<String> getNames(Class<?> type) {
if (Supplier.class.isAssignableFrom(type)) {
return this.processor.suppliers.keySet();
}
if (Consumer.class.isAssignableFrom(type)) {
return this.processor.consumers.keySet();
}
if (Function.class.isAssignableFrom(type)) {
return this.processor.functions.keySet();
}
return Collections.emptySet();
}
@Override
public int size() {
return this.processor.suppliers.size() + this.processor.functions.size()
+ this.processor.consumers.size();
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doLookup(Class<?> type, String name) {
T function = null;
if (type == null) {
function = (T) this.processor.lookupFunction(name);
if (function == null) {
function = (T) this.processor.lookupConsumer(name);
}
if (function == null) {
function = (T) this.processor.lookupSupplier(name);
}
}
else if (Function.class.isAssignableFrom(type)) {
function = (T) this.processor.lookupFunction(name);
}
else if (Supplier.class.isAssignableFrom(type)) {
function = (T) this.processor.lookupSupplier(name);
}
else if (Consumer.class.isAssignableFrom(type)) {
function = (T) this.processor.lookupConsumer(name);
}
return function;
}
}
@Component
protected static class ContextFunctionRegistry
implements InitializingBean, BeanFactoryAware {
protected static class BeanFactoryFunctionCatalog extends AbstractComposableFunctionRegistry
implements InitializingBean, BeanFactoryAware {
private ApplicationEventPublisher applicationEventPublisher;
private ConfigurableListableBeanFactory beanFactory;
private Map<String, Object> suppliers = new ConcurrentHashMap<>();
@Override
public FunctionRegistration<?> getRegistration(Object function) {
String functionName = this.lookupFunctionName(function);
if (StringUtils.hasText(functionName)) {
return new FunctionRegistration<>(function, functionName)
.type(findType(function).getType());
}
return null;
}
private Map<String, Object> functions = new ConcurrentHashMap<>();
private Map<String, Object> consumers = new ConcurrentHashMap<>();
private Map<Object, String> names = new ConcurrentHashMap<>();
private Map<String, FunctionType> types = new ConcurrentHashMap<>();
public <T> void register(FunctionRegistration<T> functionRegistration) {
Assert.notEmpty(functionRegistration.getNames(),
"'registration' must contain at least one name before it is registered in catalog.");
wrap(functionRegistration, functionRegistration.getNames().iterator().next());
}
/**
* Will collect all suppliers, functions, consumers and function registration as
@@ -213,46 +139,36 @@ public class ContextFunctionCatalogAutoConfiguration {
@PreDestroy
public void close() {
if (this.applicationEventPublisher != null) {
if (!this.functions.isEmpty()) {
if (this.hasFunctions()) {
this.applicationEventPublisher
.publishEvent(new FunctionUnregistrationEvent(this,
Function.class, this.functions.keySet()));
Function.class, this.getFunctionNames()));
}
if (!this.consumers.isEmpty()) {
if (this.hasConsumers()) {
this.applicationEventPublisher
.publishEvent(new FunctionUnregistrationEvent(this,
Consumer.class, this.consumers.keySet()));
Consumer.class, this.getConsumerNames()));
}
if (!this.suppliers.isEmpty()) {
if (this.hasSuppliers()) {
this.applicationEventPublisher
.publishEvent(new FunctionUnregistrationEvent(this,
Supplier.class, this.suppliers.keySet()));
Supplier.class, this.getSupplierNames()));
}
}
}
<T> void register(FunctionRegistration<T> function) {
wrap(function, function.getNames().iterator().next());
}
@Override
protected FunctionType findType(Object function) {
String name = this.lookupFunctionName(function);
FunctionType functionType = this.getFunctionType(name);
FunctionRegistration<?> getRegistration(Object function) {
if (names.containsKey(function)) {
return new FunctionRegistration<>(function, this.names.get(function))
.type(findType(function).getType());
if (functionType == null) {
functionType = functionByNameExist(name)
? new FunctionType(function.getClass()) : new FunctionType(
FunctionContextUtils.findType(name, this.beanFactory));
}
return null;
}
Supplier<?> lookupSupplier(String name) {
return (Supplier<?>) lookup(name, this.suppliers, Supplier.class);
}
Function<?, ?> lookupFunction(String name) {
return (Function<?, ?>) lookup(name, this.functions, Function.class);
}
Consumer<?> lookupConsumer(String name) {
return (Consumer<?>) lookup(name, this.consumers, Consumer.class);
return functionType;
}
// @checkstyle:off
@@ -269,135 +185,6 @@ public class ContextFunctionCatalogAutoConfiguration {
}
// @checkstyle:on
@SuppressWarnings("unchecked")
private Object lookup(String name, @SuppressWarnings("rawtypes") Map lookup,
Class<?> typeOfFunction) {
Object function = compose(name, lookup);
if (function != null
&& typeOfFunction.isAssignableFrom(function.getClass())) {
return function;
}
return null;
}
private String normalizeName(String name) {
return name.replaceAll(",", "|").trim();
}
private Object compose(String name, Map<String, Object> lookup) {
name = normalizeName(name);
Object composedFunction = null;
if (lookup.containsKey(name)) {
composedFunction = lookup.get(name);
}
else {
if (name.equals("") && lookup.size() == 1) {
composedFunction = lookup.values().iterator().next();
}
else {
String[] stages = StringUtils.delimitedListToStringArray(name, "|");
if (Stream.of(stages).allMatch(funcName -> contains(funcName))) {
List<Object> composableFunctions = Stream.of(stages)
.map(funcName -> find(funcName))
.collect(Collectors.toList());
composedFunction = composableFunctions.stream()
.reduce((a, z) -> composeFunctions(a, z))
.orElseGet(() -> null);
if (composedFunction != null && !this.types.containsKey(name)
&& this.types.containsKey(stages[0])
&& this.types.containsKey(stages[stages.length - 1])) {
FunctionType input = this.types.get(stages[0]);
FunctionType output = this.types
.get(stages[stages.length - 1]);
this.types.put(name, FunctionType.compose(input, output));
this.names.put(composedFunction, name);
if (composedFunction instanceof Function) {
this.functions.put(name, composedFunction);
}
else if (composedFunction instanceof Consumer) {
this.consumers.put(name, composedFunction);
}
else if (composedFunction instanceof Supplier) {
this.suppliers.put(name, composedFunction);
}
}
}
}
}
return composedFunction;
}
private boolean contains(String name) {
return suppliers.containsKey(name) || functions.containsKey(name)
|| consumers.containsKey(name);
}
private Object find(String name) {
Object result = suppliers.get(name);
if (result == null) {
result = functions.get(name);
}
if (result == null) {
result = consumers.get(name);
}
return result;
}
@SuppressWarnings("unchecked")
private Object composeFunctions(Object a, Object b) {
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
return (Supplier<Mono<Void>>) () -> Mono.from(supplier.get()
.compose(v -> fConsumer.apply(supplier.get())));
}
else {
throw new IllegalStateException(
"The provided supplier is finite (i.e., already composed with Consumer) "
+ "therefore it can not be composed with another consumer");
}
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
return (Supplier<Object>) () -> function.apply(supplier.get());
}
}
else if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;
Function<Object, Object> function2 = (Function<Object, Object>) b;
if (function1 instanceof FluxToMonoFunction) {
if (function2 instanceof MonoToFluxFunction) {
return function1.andThen(function2);
}
else {
throw new IllegalStateException(
"The provided function is finite (i.e., returns Mono<?>) "
+ "therefore it can *only* be composed with compatible function (i.e., Function<Mono, Flux>");
}
}
else if (function2 instanceof FluxToMonoFunction) {
return new FluxToMonoFunction<Object, Object>(
((Function<Flux<Object>, Flux<Object>>) a)
.andThen(((FluxToMonoFunction<Object, Object>) b)
.getTarget()));
}
else {
return function1.andThen(function2);
}
}
else if (a instanceof Function && b instanceof Consumer) {
Function<Object, Object> function = (Function<Object, Object>) a;
Consumer<Object> consumer = (Consumer<Object>) b;
return (Consumer<Object>) v -> consumer.accept(function.apply(v));
}
else {
throw new IllegalArgumentException(String.format(
"Could not compose %s and %s", a.getClass(), b.getClass()));
}
}
private Collection<String> getAliases(String key) {
Collection<String> names = new LinkedHashSet<>();
String value = getQualifier(key);
@@ -408,67 +195,67 @@ public class ContextFunctionCatalogAutoConfiguration {
return names;
}
private void wrap(FunctionRegistration<?> registration, String key) {
Object target = registration.getTarget();
this.names.put(target, key);
if (registration.getType() != null) {
this.types.put(key, registration.getType());
}
else {
registration.type(findType(target).getType());
}
Class<?> type;
registration = isolated(registration).wrap();
target = registration.getTarget();
if (target instanceof Supplier) {
type = Supplier.class;
for (String name : registration.getNames()) {
this.suppliers.put(name, registration.getTarget());
}
}
else if (target instanceof Consumer) {
type = Consumer.class;
for (String name : registration.getNames()) {
this.consumers.put(name, registration.getTarget());
}
}
else if (target instanceof Function) {
type = Function.class;
for (String name : registration.getNames()) {
this.functions.put(name, registration.getTarget());
}
}
else {
return;
}
this.names.put(registration.getTarget(), key);
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent(
registration.getTarget(), type, registration.getNames()));
}
}
// private void wrap(FunctionRegistration<?> registration, String key) {
// Object target = registration.getTarget();
//// this.addName(target, key);
// if (registration.getType() != null) {
// this.addType(key, registration.getType());
// }
// else {
// registration.type(findType(target).getType());
// }
// Class<?> type;
// registration = isolated(registration).wrap();
// target = registration.getTarget();
// if (target instanceof Supplier) {
// type = Supplier.class;
// for (String name : registration.getNames()) {
// this.addSupplier(name, registration.getTarget());
// }
// }
// else if (target instanceof Consumer) {
// type = Consumer.class;
// for (String name : registration.getNames()) {
// this.addConsumer(name, registration.getTarget());
// }
// }
// else if (target instanceof Function) {
// type = Function.class;
// for (String name : registration.getNames()) {
// this.addFunction(name, registration.getTarget());
// }
// }
// else {
// return;
// }
// //this.addName(registration.getTarget(), key);
// if (this.applicationEventPublisher != null) {
// this.applicationEventPublisher.publishEvent(new FunctionRegistrationEvent(
// registration.getTarget(), type, registration.getNames()));
// }
// }
@SuppressWarnings({ "rawtypes", "unchecked" })
private FunctionRegistration<?> isolated(FunctionRegistration<?> input) {
FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
Object target = registration.getTarget();
boolean isolated = getClass().getClassLoader() != target.getClass()
.getClassLoader();
if (isolated) {
if (target instanceof Supplier<?> && isolated) {
target = new IsolatedSupplier((Supplier<?>) target);
}
else if (target instanceof Function<?, ?>) {
target = new IsolatedFunction((Function<?, ?>) target);
}
else if (target instanceof Consumer<?>) {
target = new IsolatedConsumer((Consumer<?>) target);
}
}
registration.target(target);
return registration;
}
// @SuppressWarnings({ "rawtypes", "unchecked" })
// private FunctionRegistration<?> isolated(FunctionRegistration<?> input) {
// FunctionRegistration<Object> registration = (FunctionRegistration<Object>) input;
// Object target = registration.getTarget();
// boolean isolated = getClass().getClassLoader() != target.getClass()
// .getClassLoader();
// if (isolated) {
// if (target instanceof Supplier<?> && isolated) {
// target = new IsolatedSupplier((Supplier<?>) target);
// }
// else if (target instanceof Function<?, ?>) {
// target = new IsolatedFunction((Function<?, ?>) target);
// }
// else if (target instanceof Consumer<?>) {
// target = new IsolatedConsumer((Consumer<?>) target);
// }
// }
//
// registration.target(target);
// return registration;
// }
private String getQualifier(String key) {
if (this.beanFactory != null
@@ -487,22 +274,6 @@ public class ContextFunctionCatalogAutoConfiguration {
return key;
}
private FunctionType findType(Object function) {
String name = this.names.get(function);
FunctionType functionType;
if (this.types.containsKey(name)) {
functionType = this.types.get(name);
}
else {
functionType = functionByNameExist(name)
? new FunctionType(function.getClass()) : new FunctionType(
FunctionContextUtils.findType(name, this.beanFactory));
this.types.computeIfAbsent(name, str -> functionType);
}
return functionType;
}
private boolean functionByNameExist(String name) {
return name == null || this.beanFactory == null
|| !this.beanFactory.containsBeanDefinition(name);
@@ -540,7 +311,6 @@ public class ContextFunctionCatalogAutoConfiguration {
registrations.forEach(registration -> wrap(registration,
targets.get(registration.getTarget())));
}
}
private static class PreferGsonOrMissingJacksonCondition extends AnyNestedCondition {
@@ -577,9 +347,10 @@ public class ContextFunctionCatalogAutoConfiguration {
@Configuration
@ConditionalOnClass(ObjectMapper.class)
@ConditionalOnBean(ObjectMapper.class)
// @checkstyle:off
@ConditionalOnProperty(name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY, havingValue = "jackson", matchIfMissing = true)
// @checkstyle:on
@ConditionalOnProperty(
name = ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY,
havingValue = "jackson",
matchIfMissing = true)
protected static class JacksonConfiguration {
@Bean

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.catalog;
import java.util.function.Function;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
@@ -59,6 +60,40 @@ public class InMemoryFunctionCatalogTests {
assertThat(lookedUpFunction instanceof FluxFunction).isTrue();
}
@Test
public void testFunctionComposition() {
FunctionRegistration<UpperCase> upperCaseRegistration = new FunctionRegistration<>(
new UpperCase(), "uppercase").type(FunctionType.of(UpperCase.class).getType());
FunctionRegistration<Reverse> reverseRegistration = new FunctionRegistration<>(
new Reverse(), "reverse").type(FunctionType.of(Reverse.class).getType());
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(upperCaseRegistration);
catalog.register(reverseRegistration);
Function<Flux<String>, Flux<String>> lookedUpFunction = catalog.lookup("uppercase|reverse");
assertThat(lookedUpFunction).isNotNull();
assertThat(lookedUpFunction.apply(Flux.just("star")).blockFirst()).isEqualTo("RATS");
}
private static class UpperCase implements Function<String, String> {
@Override
public String apply(String t) {
return t.toUpperCase();
}
}
private static class Reverse implements Function<String, String> {
@Override
public String apply(String t) {
return new StringBuilder(t).reverse().toString();
}
}
private static class TestFunction implements Function<Integer, String> {
@Override

View File

@@ -32,18 +32,15 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
public class BeanFactoryFunctionCatalogTests {
private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog(
new ContextFunctionRegistry());
private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog();
@Test
public void basicRegistrationFeatures() {

View File

@@ -35,7 +35,7 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.ContextFunctionRegistry;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration.BeanFactoryFunctionCatalog;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.ClassUtils;
@@ -49,7 +49,7 @@ import static org.assertj.core.api.Assertions.assertThat;
// for functions with the same name, uncomposable combinations)
public class ContextFunctionPostProcessorTests {
private ContextFunctionRegistry processor = new ContextFunctionRegistry();
private BeanFactoryFunctionCatalog processor = new BeanFactoryFunctionCatalog();
private URLClassLoader classLoader;
@@ -70,10 +70,11 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos");
.lookup(null, "foos"); //lookupFunction("foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@SuppressWarnings("deprecation")
@Test
public void registrationThroughMerge() {
FunctionRegistration<Foos> registration = new FunctionRegistration<>(new Foos(),
@@ -82,17 +83,18 @@ public class ContextFunctionPostProcessorTests {
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap());
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos");
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@SuppressWarnings("deprecation")
@Test
public void registrationThroughMergeFromNamedFunction() {
this.processor.merge(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap(), Collections.singletonMap("foos", new Foos()));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos");
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@@ -102,7 +104,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(new Bars(), "bars"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos,bars");
.lookup(null, "foos,bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
@@ -116,7 +118,7 @@ public class ContextFunctionPostProcessorTests {
(x) -> x.toUpperCase(), "function"));
@SuppressWarnings("unchecked")
Supplier<Flux<String>> supplier = (Supplier<Flux<String>>) this.processor
.lookupSupplier("supplier|function");
.lookup(Supplier.class, "supplier|function");
assertThat(supplier.get().blockFirst()).isEqualTo("FOO");
assertThat(this.processor.getRegistration(supplier).getNames())
.containsExactly("supplier|function");
@@ -130,7 +132,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<Consumer<String>>(
System.out::println, "consumer"));
Supplier<Mono<Void>> supplier = (Supplier<Mono<Void>>) this.processor
.lookupSupplier("supplier|consumer");
.lookup(Supplier.class, "supplier|consumer");
assertThat(supplier.get().block()).isNull();
}
@@ -140,7 +142,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(new Bars(), "bars"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos|bars");
.lookup(null, "foos|bars");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("foos|bars");
@@ -152,7 +154,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(new Foos(), "foos"));
@SuppressWarnings("unchecked")
Supplier<Flux<String>> foos = (Supplier<Flux<String>>) this.processor
.lookupSupplier("ints|foos");
.lookup(Supplier.class, "ints|foos");
assertThat(foos.get().blockFirst()).isEqualTo("8");
assertThat(this.processor.getRegistration(foos).getNames())
.containsExactly("ints|foos");
@@ -167,7 +169,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(create(Foos.class), "foos"));
@SuppressWarnings("unchecked")
Function<Flux<Integer>, Flux<String>> foos = (Function<Flux<Integer>, Flux<String>>) this.processor
.lookupFunction("foos");
.lookup(null, "foos");
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("4");
}
@@ -179,7 +181,7 @@ public class ContextFunctionPostProcessorTests {
.register(new FunctionRegistration<>(create(Source.class), "source"));
@SuppressWarnings("unchecked")
Supplier<Flux<Integer>> source = (Supplier<Flux<Integer>>) this.processor
.lookupSupplier("source");
.lookup(Supplier.class, "source");
assertThat(source.get().blockFirst()).isEqualTo(4);
}
@@ -191,7 +193,7 @@ public class ContextFunctionPostProcessorTests {
this.processor.register(new FunctionRegistration<>(target, "sink"));
@SuppressWarnings("unchecked")
Function<Flux<String>, Mono<Void>> sink = (Function<Flux<String>, Mono<Void>>) this.processor
.lookupFunction("sink");
.lookup(null, "sink");
sink.apply(Flux.just("Hello")).subscribe();
@SuppressWarnings("unchecked")
List<String> values = (List<String>) ReflectionTestUtils.getField(target,