Fix Build and upgrade fully to Boot 2.0

Some tests still ignored.

Also adds draft functional bean registration support. The AWS sample
is using that now (it starts up 4x faster in AWS). To activate the
functional beans user has to supply a main class of type
ApplicationContextInitializer.
This commit is contained in:
Dave Syer
2018-06-21 13:00:46 +01:00
parent 00e2b749d2
commit 4c9627aee3
23 changed files with 896 additions and 54 deletions

View File

@@ -23,10 +23,17 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
@@ -86,6 +93,11 @@ public class FunctionRegistration<T> {
return this;
}
public FunctionRegistration<T> type(FunctionType type) {
this.type = type;
return this;
}
/**
* Allows to override the target of this registration with a new target that typically
* wraps the original target. This typically happens when original target is wrapped
@@ -111,4 +123,35 @@ public class FunctionRegistration<T> {
return this.names(Arrays.asList(names));
}
public <S> FunctionRegistration<S> wrap() {
if (type == null || type.isWrapper()) {
@SuppressWarnings("unchecked")
FunctionRegistration<S> value = (FunctionRegistration<S>) this;
return value;
}
@SuppressWarnings("unchecked")
S target = (S) this.target;
FunctionRegistration<S> result = new FunctionRegistration<S>(target);
result.type(this.type.getType());
if (target instanceof Function) {
@SuppressWarnings({ "unchecked", "rawtypes" })
S wrapped = (S) new FluxFunction((Function) target);
target = wrapped;
result.type = result.type.wrap(Flux.class);
}
else if (target instanceof Supplier) {
@SuppressWarnings({ "unchecked", "rawtypes" })
S wrapped = (S) new FluxSupplier((Supplier) target);
target = wrapped;
result.type = result.type.wrap(Flux.class);
}
else if (target instanceof Consumer) {
@SuppressWarnings({ "unchecked", "rawtypes" })
S wrapped = (S) new FluxConsumer((Consumer) target);
target = wrapped;
result.type = result.type.wrap(Flux.class, Mono.class);
}
return result.target(target).names(this.names).properties(this.properties);
}
}

View File

@@ -53,7 +53,7 @@ public class FunctionType {
final private boolean message;
public FunctionType(Type type) {
this.type = type;
this.type = functionType(type);
this.inputWrapper = findType(ParamType.INPUT_WRAPPER);
this.outputWrapper = findType(ParamType.OUTPUT_WRAPPER);
this.inputType = findType(ParamType.INPUT);
@@ -61,6 +61,22 @@ public class FunctionType {
this.message = messageType();
}
private Type functionType(Type type) {
if (Supplier.class.isAssignableFrom(extractClass(type, ParamType.OUTPUT))) {
Type product = extractType(type, ParamType.OUTPUT, 0);
Class<?> output = extractClass(product, ParamType.OUTPUT);
if (FunctionRegistration.class.isAssignableFrom(output)) {
type = extractType(product, ParamType.OUTPUT, 0);
}
else if (Function.class.isAssignableFrom(output)
|| Supplier.class.isAssignableFrom(output)
|| Consumer.class.isAssignableFrom(output)) {
type = product;
}
}
return type;
}
private boolean messageType() {
Class<?> inputType = findType(ParamType.INPUT_INNER_WRAPPER);
Class<?> outputType = findType(ParamType.OUTPUT_INNER_WRAPPER);
@@ -124,6 +140,16 @@ public class FunctionType {
.forClassWithGenerics(Function.class, input, Object.class).getType());
}
public static FunctionType supplier(Class<?> input) {
return new FunctionType(
ResolvableType.forClassWithGenerics(Supplier.class, input).getType());
}
public static FunctionType consumer(Class<?> input) {
return new FunctionType(
ResolvableType.forClassWithGenerics(Consumer.class, input).getType());
}
public FunctionType to(Class<?> output) {
ResolvableType inputGeneric = input(this);
ResolvableType outputGeneric = output(output);
@@ -245,6 +271,11 @@ public class FunctionType {
private Class<?> findType(ParamType paramType) {
int index = paramType.isOutput() ? 1 : 0;
Type type = this.type;
if (Supplier.class.isAssignableFrom(extractClass(this.type, null))) {
if (paramType.isInput()) {
return Void.class;
}
}
boolean found = false;
while (!found && type instanceof Class && type != Object.class) {
Class<?> clz = (Class<?>) type;
@@ -317,6 +348,15 @@ public class FunctionType {
}
}
else {
if (type != null) {
Type[] interfaces = ((Class<?>) type).getGenericInterfaces();
for (Type ifc : interfaces) {
Type value = extractType(ifc, paramType, index);
if (value != Object.class) {
return value;
}
}
}
param = Object.class;
}
return param;

View File

@@ -27,7 +27,6 @@ import java.util.function.Supplier;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.context.ApplicationEventPublisher;
@@ -40,11 +39,12 @@ import org.springframework.util.Assert;
* @author Oleg Zhurakousky
*/
public class InMemoryFunctionCatalog
implements FunctionRegistry, ApplicationEventPublisherAware {
implements FunctionRegistry, FunctionInspector, ApplicationEventPublisherAware {
private final Map<Class<?>, Map<String, Object>> functions;
@Autowired(required = false)
private final Map<Object, FunctionRegistration<?>> registrations;
private ApplicationEventPublisher publisher;
public InMemoryFunctionCatalog() {
@@ -54,9 +54,15 @@ public class InMemoryFunctionCatalog
public InMemoryFunctionCatalog(Set<FunctionRegistration<?>> registrations) {
Assert.notNull(registrations, "'registrations' must not be null");
this.functions = new HashMap<>();
this.registrations = new HashMap<>();
registrations.stream().forEach(reg -> register(reg));
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
return this.registrations.get(function);
}
@Override
public <T> void register(FunctionRegistration<T> registration) {
FunctionRegistrationEvent event;
@@ -81,6 +87,15 @@ public class InMemoryFunctionCatalog
event = new FunctionRegistrationEvent(this, Object.class,
registration.getNames());
}
registrations.put(registration.getTarget(), registration);
FunctionRegistration<T> wrapped = registration.wrap();
if (wrapped != registration) {
registration = wrapped;
registrations.put(wrapped.getTarget(), wrapped);
if (type == Consumer.class) {
type = Function.class;
}
}
Map<String, Object> map = functions.computeIfAbsent(type, key -> new HashMap<>());
for (String name : registration.getNames()) {
map.put(name, registration.getTarget());

View File

@@ -0,0 +1,205 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.context.properties.ConfigurationBeanFactoryMetadata;
import org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.InMemoryFunctionCatalog;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigUtils;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
* @author Dave Syer
*
*/
public class ContextFunctionCatalogBeanRegistrar implements ApplicationContextAware,
ApplicationContextInitializer<GenericApplicationContext> {
private GenericApplicationContext context;
@Override
public void initialize(GenericApplicationContext applicationContext) {
if (applicationContext.getEnvironment().getProperty("spring.functional.enabled",
Boolean.class, false)) {
register(applicationContext);
}
}
public void register(BeanDefinitionRegistry registry) throws BeansException {
try {
register(registry, beanFactory(registry));
}
catch (BeansException e) {
throw e;
}
catch (RuntimeException e) {
throw e;
}
catch (Exception e) {
throw new BeanCreationException("Cannot register from " + getClass(), e);
}
}
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
Assert.isInstanceOf(GenericApplicationContext.class, context,
"Expecting a GenericApplicationContext");
this.context = (GenericApplicationContext) context;
}
private ConfigurableListableBeanFactory beanFactory(BeanDefinitionRegistry registry) {
if (this.context == null && registry instanceof AutowireCapableBeanFactory) {
((AutowireCapableBeanFactory) registry).initializeBean(this,
getClass().getSimpleName());
}
if (this.context == null && registry instanceof GenericApplicationContext) {
this.context = (GenericApplicationContext) registry;
}
if (this.context != null) {
return this.context.getDefaultListableBeanFactory();
}
if (registry instanceof ConfigurableListableBeanFactory) {
return (ConfigurableListableBeanFactory) registry;
}
if (registry instanceof ConfigurableApplicationContext) {
return ((ConfigurableApplicationContext) registry).getBeanFactory();
}
throw new IllegalStateException(
"Cannot locate ConfigurableListableBeanFactory in " + registry);
}
protected void register(BeanDefinitionRegistry registry,
ConfigurableListableBeanFactory factory) throws Exception {
performPreinitialization();
context.registerBean(PropertySourcesPlaceholderConfigurer.class,
() -> PropertyPlaceholderAutoConfiguration
.propertySourcesPlaceholderConfigurer());
context.registerBean(
AnnotationConfigUtils.AUTOWIRED_ANNOTATION_PROCESSOR_BEAN_NAME,
AutowiredAnnotationBeanPostProcessor.class);
context.registerBean(ConfigurationBeanFactoryMetadata.BEAN_NAME,
ConfigurationBeanFactoryMetadata.class,
() -> new ConfigurationBeanFactoryMetadata());
context.registerBean(ConfigurationPropertiesBindingPostProcessor.BEAN_NAME,
ConfigurationPropertiesBindingPostProcessor.class,
() -> new ConfigurationPropertiesBindingPostProcessor());
// TODO: use Boot to create Gson and ObjectMapper
if (ClassUtils.isPresent("com.google.gson.Gson", null)
&& !"gson".equals(context.getEnvironment().getProperty(
ContextFunctionCatalogAutoConfiguration.PREFERRED_MAPPER_PROPERTY,
"gson"))) {
context.registerBean(Gson.class, () -> new Gson());
context.registerBean(JsonMapper.class,
() -> new ContextFunctionCatalogAutoConfiguration.GsonConfiguration()
.jsonMapper(context.getBean(Gson.class)));
}
else if (ClassUtils.isPresent(
"com.fasterxml.jackson.databind.ObjectMapper.ObjectMapper", null)) {
context.registerBean(ObjectMapper.class, () -> new ObjectMapper());
context.registerBean(JsonMapper.class,
() -> new ContextFunctionCatalogAutoConfiguration.JacksonConfiguration()
.jsonMapper(context.getBean(ObjectMapper.class)));
}
context.registerBean(InMemoryFunctionCatalog.class,
() -> new InMemoryFunctionCatalog());
context.registerBean(FunctionRegistrationPostProcessor.class,
() -> new FunctionRegistrationPostProcessor(
context.getBean(FunctionRegistry.class)));
}
private void performPreinitialization() {
try {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
runSafely(() -> new DefaultFormattingConversionService());
}
public void runSafely(Runnable runnable) {
try {
runnable.run();
}
catch (Throwable ex) {
// Ignore
}
}
}, "background-preinit");
thread.start();
}
catch (Exception ex) {
}
}
private class FunctionRegistrationPostProcessor implements BeanPostProcessor {
private final FunctionRegistry catalog;
public FunctionRegistrationPostProcessor(FunctionRegistry catalog) {
this.catalog = catalog;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
if (bean instanceof FunctionRegistration) {
FunctionRegistration<?> registration = (FunctionRegistration<?>) bean;
if (registration.getNames().isEmpty()) {
registration = registration.name(beanName);
}
if (registration.getType() == null) {
throw new IllegalStateException(
"You need an explicit type for the function: " + beanName);
// TODO: in principle Spring could know how to extract this from the
// supplier, but in practice there is no functional bean registration
// with parametric types.
}
catalog.register(registration);
}
return bean;
}
}
}

View File

@@ -3,3 +3,6 @@ org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConf
org.springframework.cloud.function.context.WrapperDetector=\
org.springframework.cloud.function.context.config.FluxWrapperDetector
org.springframework.context.ApplicationContextInitializer=\
org.springframework.cloud.function.context.config.ContextFunctionCatalogBeanRegistrar