Convert functions etc. after context starts

The app deployer now has to reach into the function contexts and
extract a catalog and call its methods reflectively.
This commit is contained in:
Dave Syer
2017-01-04 10:43:18 +00:00
parent 13774abe39
commit 91717ec9a6
9 changed files with 327 additions and 258 deletions

View File

@@ -18,7 +18,10 @@ package org.springframework.cloud.function.context;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -27,15 +30,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.BeanDefinitionRegistryPostProcessor;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.support.SimpleBeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
@@ -64,17 +63,23 @@ public class ContextFunctionCatalogAutoConfiguration {
private Map<String, Supplier<?>> suppliers = Collections.emptyMap();
@Bean
public FunctionCatalog functionCatalog() {
return new ApplicationContextFunctionCatalog(functions, consumers, suppliers);
public FunctionCatalog functionCatalog(ContextFunctionPostProcessor processor,
ObjectMapper mapper) {
return new ApplicationContextFunctionCatalog(
processor.wrapFunctions(mapper, functions),
processor.wrapConsumers(mapper, consumers),
processor.wrapSuppliers(mapper, suppliers));
}
@Component
public static class ContextFunctionPostProcessor
implements BeanFactoryPostProcessor, BeanDefinitionRegistryPostProcessor {
private BeanDefinitionRegistry registry;
private Set<String> functions = new HashSet<>();
private Set<String> consumers = new HashSet<>();
private Set<String> suppliers = new HashSet<>();
private BeanDefinitionRegistry targets = new SimpleBeanDefinitionRegistry();
private BeanDefinitionRegistry registry;
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry)
@@ -82,18 +87,67 @@ public class ContextFunctionCatalogAutoConfiguration {
this.registry = registry;
}
public Map<String, Supplier<?>> wrapSuppliers(ObjectMapper mapper,
Map<String, Supplier<?>> suppliers) {
Map<String, Supplier<?>> result = new HashMap<>();
for (String key : suppliers.keySet()) {
if (this.suppliers.contains(key)) {
@SuppressWarnings("unchecked")
Supplier<Flux<?>> supplier = (Supplier<Flux<?>>) suppliers.get(key);
result.put(key, wrapSupplier(supplier, mapper));
}
else {
result.put(key, suppliers.get(key));
}
}
return result;
}
public Map<String, Consumer<?>> wrapConsumers(ObjectMapper mapper,
Map<String, Consumer<?>> consumers) {
Map<String, Consumer<?>> result = new HashMap<>();
for (String key : consumers.keySet()) {
if (this.consumers.contains(key)) {
@SuppressWarnings("unchecked")
Consumer<Flux<?>> consumer = (Consumer<Flux<?>>) consumers.get(key);
result.put(key, wrapConsumer(consumer, mapper, key));
}
else {
result.put(key, consumers.get(key));
}
}
return result;
}
public Map<String, Function<?, ?>> wrapFunctions(ObjectMapper mapper,
Map<String, Function<?, ?>> functions) {
Map<String, Function<?, ?>> result = new HashMap<>();
for (String key : functions.keySet()) {
if (this.functions.contains(key)) {
@SuppressWarnings("unchecked")
Function<Flux<?>, Flux<?>> function = (Function<Flux<?>, Flux<?>>) functions
.get(key);
result.put(key, wrapFunction(function, mapper, key));
}
else {
result.put(key, functions.get(key));
}
}
return result;
}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory factory)
throws BeansException {
for (String name : factory.getBeanDefinitionNames()) {
if (isGenericFunction(factory, name)) {
wrapFunction(factory, name);
this.functions.add(name);
}
else if (isGenericSupplier(factory, name)) {
wrapSupplier(factory, name);
this.suppliers.add(name);
}
else if (isGenericConsumer(factory, name)) {
wrapConsumer(factory, name);
this.consumers.add(name);
}
}
}
@@ -131,40 +185,44 @@ public class ContextFunctionCatalogAutoConfiguration {
String.class)));
}
private void wrapFunction(ConfigurableListableBeanFactory factory, String name) {
AbstractBeanDefinition wrapped = getInputAwareWrappedBean(name,
ProxyFunction.class).getBeanDefinition();
registry.registerBeanDefinition(name, wrapped);
private ProxyFunction wrapFunction(Function<Flux<?>, Flux<?>> function,
ObjectMapper mapper, String name) {
ProxyFunction wrapped = new ProxyFunction(mapper);
wrapped.setDelegate(function);
wrapped.setType(findType(name));
return wrapped;
}
private void wrapSupplier(ConfigurableListableBeanFactory factory, String name) {
AbstractBeanDefinition wrapped = getWrappedBean(name, ProxySupplier.class)
.getBeanDefinition();
registry.registerBeanDefinition(name, wrapped);
private ProxySupplier wrapSupplier(Supplier<Flux<?>> supplier,
ObjectMapper mapper) {
ProxySupplier wrapped = new ProxySupplier(mapper);
wrapped.setDelegate(supplier);
return wrapped;
}
private void wrapConsumer(ConfigurableListableBeanFactory factory, String name) {
AbstractBeanDefinition wrapped = getInputAwareWrappedBean(name,
ProxyConsumer.class).getBeanDefinition();
registry.registerBeanDefinition(name, wrapped);
private ProxyConsumer wrapConsumer(Consumer<Flux<?>> consumer,
ObjectMapper mapper, String name) {
ProxyConsumer wrapped = new ProxyConsumer(mapper);
wrapped.setDelegate(consumer);
wrapped.setType(findType(name));
return wrapped;
}
private BeanDefinitionBuilder getInputAwareWrappedBean(String name,
Class<?> type) {
BeanDefinitionBuilder builder = getWrappedBean(name, type);
builder.addPropertyValue("name", name);
targets.registerBeanDefinition(name, registry.getBeanDefinition(name));
builder.addPropertyValue("registry", targets);
return builder;
private Class<?> findType(RootBeanDefinition definition) {
StandardMethodMetadata source = (StandardMethodMetadata) definition
.getSource();
ParameterizedType type = (ParameterizedType) (source.getIntrospectedMethod()
.getGenericReturnType());
type = (ParameterizedType) type.getActualTypeArguments()[0];
Type param = type.getActualTypeArguments()[0];
return ClassUtils.resolveClassName(param.getTypeName(),
registry.getClass().getClassLoader());
}
private BeanDefinitionBuilder getWrappedBean(String name, Class<?> type) {
BeanDefinition definition = registry.getBeanDefinition(name);
BeanDefinitionBuilder builder = BeanDefinitionBuilder
.genericBeanDefinition(type);
builder.addPropertyValue("delegate", definition);
return builder;
private Class<?> findType(String name) {
return findType((RootBeanDefinition) registry.getBeanDefinition(name));
}
}
}
@@ -174,12 +232,8 @@ abstract class ProxyWrapper<T> {
private T delegate;
private String name;
private Class<?> type;
private BeanDefinitionRegistry registry;
public ProxyWrapper(ObjectMapper mapper) {
this.mapper = mapper;
}
@@ -192,29 +246,12 @@ abstract class ProxyWrapper<T> {
return delegate;
}
public void setName(String name) {
this.name = name;
}
public void setRegistry(BeanDefinitionRegistry registry) {
this.registry = registry;
}
private Class<?> findType(RootBeanDefinition definition) {
StandardMethodMetadata source = (StandardMethodMetadata) definition.getSource();
ParameterizedType type = (ParameterizedType) (source.getIntrospectedMethod()
.getGenericReturnType());
type = (ParameterizedType) type.getActualTypeArguments()[0];
Type param = type.getActualTypeArguments()[0];
return ClassUtils.resolveClassName(param.getTypeName(),
registry.getClass().getClassLoader());
public void setType(Class<?> type) {
this.type = type;
}
public Class<?> getType() {
if (type == null) {
type = findType((RootBeanDefinition) registry.getBeanDefinition(name));
}
return type;
return this.type;
}
public Object fromJson(String value) {
@@ -235,6 +272,11 @@ abstract class ProxyWrapper<T> {
}
}
@Override
public String toString() {
return "ProxyWrapper [delegate=" + delegate + ", type=" + type + "]";
}
}
class ProxyFunction extends ProxyWrapper<Function<Flux<?>, Flux<?>>>