GH-955 Removes GCP Invoker dependency on AbstractSpringFunctionAdapterInitializer

Restores classpath for classloader, function resolution

Resolves #955
Resolves #970
This commit is contained in:
Biju Kunjummen
2022-12-13 20:56:23 -08:00
committed by Oleg Zhurakousky
parent 25d524f395
commit bab07efa9f
2 changed files with 90 additions and 406 deletions

View File

@@ -1,385 +0,0 @@
/*
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.adapter.gcp;
import java.io.Closeable;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.FunctionContextUtils;
import org.springframework.cloud.function.context.config.JsonMessageConverter;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.utils.FunctionClassUtils;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.convert.support.GenericConversionService;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* Base implementation for adapter initializers and request handlers.
*
* @param <C> the type of the target specific (native) context object.
*
* @author Oleg Zhurakousky
* @since 2.1
*
* NOTE: Moved from core package. Onl used by GCP
*/
@SuppressWarnings("rawtypes")
abstract class AbstractSpringFunctionAdapterInitializer<C> implements Closeable {
private static Log logger = LogFactory.getLog(AbstractSpringFunctionAdapterInitializer.class);
/**
* Name of the bean for registering the target execution context passed to `initialize(context)` operation.
*/
public static final String TARGET_EXECUTION_CTX_NAME = "executionContext";
private final Class<?> configurationClass;
private Function function;
private Consumer consumer;
private Supplier supplier;
private FunctionRegistration<?> functionRegistration;
private AtomicBoolean initialized = new AtomicBoolean();
@Autowired(required = false)
protected FunctionCatalog catalog;
@Autowired(required = false)
protected JsonMapper jsonMapper;
private ConfigurableApplicationContext context;
public ConfigurableApplicationContext getContext() {
return context;
}
AbstractSpringFunctionAdapterInitializer(Class<?> configurationClass) {
Assert.notNull(configurationClass, "'configurationClass' must not be null");
this.configurationClass = configurationClass;
}
AbstractSpringFunctionAdapterInitializer() {
this(FunctionClassUtils.getStartClass());
}
@Override
public void close() {
if (this.context != null) {
this.context.close();
}
}
protected void initialize(C targetContext) {
if (!this.initialized.compareAndSet(false, true)) {
return;
}
logger.info("Initializing: " + this.configurationClass);
SpringApplication builder = springApplication();
ConfigurableApplicationContext context = builder.run();
context.getAutowireCapableBeanFactory().autowireBean(this);
this.context = context;
if (this.catalog == null) {
SmartCompositeMessageConverter messageConverter =
new SmartCompositeMessageConverter(Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper()))));
this.catalog = new SimpleFunctionRegistry(new GenericConversionService(),
messageConverter, new JacksonMapper(new ObjectMapper()));
initFunctionConsumerOrSupplierFromContext(targetContext);
}
else {
initFunctionConsumerOrSupplierFromCatalog(targetContext);
}
}
protected Class<?> getInputType() {
Object func = function();
if (func != null && func instanceof FunctionInvocationWrapper) {
return FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) func).getInputType()));
}
if (functionRegistration != null) {
return FunctionTypeUtils.getRawType(FunctionTypeUtils.getInputType(functionRegistration.getType()));
}
return Object.class;
}
@SuppressWarnings("unchecked")
protected Function<Publisher<?>, Publisher<?>> getFunction() {
return function;
}
protected Object function() {
if (this.function != null) {
return this.function;
}
else if (this.consumer != null) {
return this.consumer;
}
else if (this.supplier != null) {
return this.supplier;
}
return null;
}
@SuppressWarnings("unchecked")
protected Publisher<?> apply(Publisher<?> input) {
if (this.function != null) {
Object result = this.function.apply(input);
if (result instanceof Publisher) {
return Flux.from((Publisher) result);
}
else {
return Flux.just(result);
}
}
if (this.consumer != null) {
this.consumer.accept(input);
return Flux.empty();
}
if (this.supplier != null) {
Object result = this.supplier.get();
if (!(result instanceof Publisher)) {
result = Mono.just(result);
}
return (Publisher<?>) result;
}
throw new IllegalStateException("No function defined");
}
/**
* Allows you to resolve function name for cases where it
* could not be located under default name.
*
* Default implementation returns empty string.
*
* @param targetContext the target context instance
* @return the name of the function
*/
protected String doResolveName(Object targetContext) {
return "";
}
protected Object convertOutput(Object input, Object output) {
return output;
}
protected <O> O result(Object input, Publisher<?> output) {
List<Object> result = new ArrayList<>();
for (Object value : Flux.from(output).toIterable()) {
result.add(this.convertOutput(input, value));
}
if (isSingleInput(getFunction(), input) && result.size() == 1) {
@SuppressWarnings("unchecked")
O value = (O) result.get(0);
return value;
}
if (isSingleOutput(getFunction(), input) && result.size() == 1) {
@SuppressWarnings("unchecked")
O value = (O) result.get(0);
return value;
}
@SuppressWarnings("unchecked")
O value = (O) result;
return CollectionUtils.isEmpty(result) ? null : value;
}
protected void clear(String name) {
FunctionInvocationWrapper f = this.catalog.lookup(name);
if (f.isFunction()) {
this.function = f;
}
else if (f.isConsumer()) {
this.consumer = f;
}
else {
this.supplier = f;
}
}
private boolean isSingleInput(Function<?, ?> function, Object input) {
if (!(input instanceof Collection)) {
return true;
}
if (function != null) {
return Collection.class
.isAssignableFrom(((FunctionInvocationWrapper) function).getRawInputType());
}
return ((Collection<?>) input).size() <= 1;
}
private boolean isSingleOutput(Function<?, ?> function, Object output) {
if (!(output instanceof Collection)) {
return true;
}
if (function != null) {
Class<?> outputType = FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) function).getOutputType()));
return Collection.class.isAssignableFrom(outputType);
}
return ((Collection<?>) output).size() <= 1;
}
private String resolveName(Class<?> type, Object targetContext) {
String functionName = context.getEnvironment().getProperty("function.name");
if (functionName != null) {
return functionName;
}
else if (type.isAssignableFrom(Function.class)) {
return "function";
}
else if (type.isAssignableFrom(Consumer.class)) {
return "consumer";
}
else if (type.isAssignableFrom(Supplier.class)) {
return "supplier";
}
throw new IllegalStateException("Unknown type " + type);
}
@SuppressWarnings({ "unchecked" })
private <T> T getAndInstrumentFromContext(String name) {
this.functionRegistration =
new FunctionRegistration(context.getBean(name), name);
Type type = FunctionContextUtils.
findType(name, this.context.getBeanFactory());
this.functionRegistration = functionRegistration.type(type);
((FunctionRegistry) this.catalog).register(functionRegistration);
return this.catalog.lookup(name);
}
private void initFunctionConsumerOrSupplierFromContext(Object targetContext) {
String name = resolveName(Function.class, targetContext);
if (context.containsBean(name) && context.getBean(name) instanceof Function) {
this.function = getAndInstrumentFromContext(name);
return;
}
name = resolveName(Consumer.class, targetContext);
if (context.containsBean(name) && context.getBean(name) instanceof Consumer) {
this.function = getAndInstrumentFromContext(name); // FluxConsumer or any other consumer wrapper is a Function
return;
}
name = resolveName(Supplier.class, targetContext);
if (context.containsBean(name) && context.getBean(name) instanceof Supplier) {
this.supplier = getAndInstrumentFromContext(name);
return;
}
}
private void initFunctionConsumerOrSupplierFromCatalog(Object targetContext) {
String name = resolveName(Function.class, targetContext);
this.function = this.catalog.lookup(Function.class, name);
if (this.function != null) {
return;
}
name = resolveName(Consumer.class, targetContext);
this.consumer = this.catalog.lookup(Consumer.class, name);
if (this.consumer != null) {
return;
}
name = resolveName(Supplier.class, targetContext);
this.supplier = this.catalog.lookup(Supplier.class, name);
if (this.supplier != null) {
return;
}
if (this.catalog.size() >= 1 && this.catalog.size() <= 2) { // we may have RoutingFunction function
String functionName = this.catalog.getNames(Function.class).stream()
.filter(n -> !n.equals(RoutingFunction.FUNCTION_NAME))
.findFirst().orElseGet(() -> null);
if (functionName != null) {
this.function = this.catalog.lookup(Function.class, functionName);
return;
}
functionName = this.catalog.getNames(Supplier.class).stream()
.findFirst().orElseGet(() -> null);
if (functionName != null) {
this.supplier = this.catalog.lookup(Supplier.class, functionName);
return;
}
functionName = this.catalog.getNames(Consumer.class).stream()
.findFirst().orElseGet(() -> null);
if (functionName != null) {
this.consumer = this.catalog.lookup(Consumer.class, functionName);
return;
}
}
else {
name = this.doResolveName(targetContext);
this.function = this.catalog.lookup(Function.class, name);
if (this.function != null) {
return;
}
this.consumer = this.catalog.lookup(Consumer.class, name);
if (this.consumer != null) {
return;
}
this.supplier = this.catalog.lookup(Supplier.class, name);
if (this.supplier != null) {
return;
}
}
}
private SpringApplication springApplication() {
Class<?> sourceClass = this.configurationClass;
SpringApplication application = new org.springframework.cloud.function.context.FunctionalSpringApplication(
sourceClass);
application.setWebApplicationType(WebApplicationType.NONE);
return application;
}
}

View File

@@ -20,7 +20,9 @@ import java.io.BufferedReader;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.cloud.functions.Context;
@@ -31,7 +33,15 @@ import com.google.cloud.functions.RawBackgroundFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionalSpringApplication;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.utils.FunctionClassUtils;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@@ -46,10 +56,10 @@ import org.springframework.util.MimeTypeUtils;
* @author Dmitry Solomakha
* @author Mike Eltsufin
* @author Oleg Zhurakousky
* @author Biju Kunjummen
* @since 3.0.4
*/
public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer<HttpRequest>
implements HttpFunction, RawBackgroundFunction {
public class FunctionInvoker implements HttpFunction, RawBackgroundFunction {
private static final Log log = LogFactory.getLog(FunctionInvoker.class);
@@ -60,34 +70,38 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer<Ht
private String functionName = "";
protected FunctionCatalog catalog;
private FunctionInvocationWrapper functionWrapped;
private ConfigurableApplicationContext context;
public FunctionInvoker() {
super();
init();
this(FunctionClassUtils.getStartClass());
}
public FunctionInvoker(Class<?> configurationClass) {
super(configurationClass);
init();
init(configurationClass);
}
private void init() {
if (System.getenv().containsKey("spring.cloud.function.definition")) {
this.functionName = System.getenv("spring.cloud.function.definition");
}
private void init(Class<?> configurationClass) {
// Default to GSON if implementation not specified.
if (!System.getenv().containsKey(ContextFunctionCatalogAutoConfiguration.JSON_MAPPER_PROPERTY)) {
System.setProperty(ContextFunctionCatalogAutoConfiguration.JSON_MAPPER_PROPERTY, "gson");
}
Thread.currentThread() // TODO: remove after upgrading to 1.0.0-alpha-2-rc5
.setContextClassLoader(FunctionInvoker.class.getClassLoader());
initialize(null);
.setContextClassLoader(FunctionInvoker.class.getClassLoader());
log.info("Initializing: " + configurationClass);
SpringApplication springApplication = springApplication(configurationClass);
this.context = springApplication.run();
this.catalog = this.context.getBean(FunctionCatalog.class);
initFunctionConsumerOrSupplierFromCatalog();
}
private <I> Function<Message<I>, Message<byte[]>> lookupFunction() {
Function<Message<I>, Message<byte[]>> function = this.catalog.lookup(functionName,
MimeTypeUtils.APPLICATION_JSON.toString());
MimeTypeUtils.APPLICATION_JSON.toString());
Assert.notNull(function, "'function' with name '" + functionName + "' must not be null");
return function;
}
@@ -98,10 +112,9 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer<Ht
*/
@Override
public void service(HttpRequest httpRequest, HttpResponse httpResponse) throws Exception {
Function<Message<BufferedReader>, Message<byte[]>> function = lookupFunction();
Message<BufferedReader> message = getInputType() == Void.class || getInputType() == null ? null
Message<BufferedReader> message = this.functionWrapped.getInputType() == Void.class || this.functionWrapped.getInputType() == null ? null
: MessageBuilder.withPayload(httpRequest.getReader()).copyHeaders(httpRequest.getHeaders()).build();
Message<byte[]> result = function.apply(message);
@@ -135,16 +148,16 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer<Ht
/**
* The implementation of a GCF {@link RawBackgroundFunction} that will be used as the
* entry point from GCF.
* @param json the payload.
*
* @param json the payload.
* @param context event context.
* @since 3.0.5
*/
@Override
public void accept(String json, Context context) {
Function<Message<String>, Message<byte[]>> function = lookupFunction();
Message<String> message = getInputType() == Void.class ? null
: MessageBuilder.withPayload(json).setHeader("gcf_context", context).build();
Message<String> message = this.functionWrapped.getInputType() == Void.class ? null
: MessageBuilder.withPayload(json).setHeader("gcf_context", context).build();
Message<byte[]> result = function.apply(message);
@@ -153,4 +166,60 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer<Ht
}
}
private void initFunctionConsumerOrSupplierFromCatalog() {
String name = resolveName(Function.class);
this.functionWrapped = this.catalog.lookup(Function.class, name);
if (this.functionWrapped != null) {
this.functionName = name;
return;
}
name = resolveName(Consumer.class);
this.functionWrapped = this.catalog.lookup(Consumer.class, name);
if (this.functionWrapped != null) {
this.functionName = name;
return;
}
name = resolveName(Supplier.class);
this.functionWrapped = this.catalog.lookup(Supplier.class, name);
if (this.functionWrapped != null) {
this.functionName = name;
return;
}
// Default to Routing Function
this.functionWrapped = this.catalog.lookup(RoutingFunction.FUNCTION_NAME, "application/json");
if (this.functionWrapped != null) {
this.functionName = RoutingFunction.FUNCTION_NAME;
}
Assert.notNull(this.functionWrapped, "Couldn't resolve a handler function");
}
private String resolveName(Class<?> type) {
if (System.getenv().containsKey("spring.cloud.function.definition")) {
return System.getenv("spring.cloud.function.definition");
}
String functionName = this.context.getEnvironment().getProperty("function.name");
if (functionName != null) {
return functionName;
}
else if (type.isAssignableFrom(Function.class)) {
return "function";
}
else if (type.isAssignableFrom(Consumer.class)) {
return "consumer";
}
else if (type.isAssignableFrom(Supplier.class)) {
return "supplier";
}
throw new IllegalStateException("Unknown type " + type);
}
private SpringApplication springApplication(Class<?> configurationClass) {
SpringApplication application = new FunctionalSpringApplication(configurationClass);
application.setWebApplicationType(WebApplicationType.NONE);
return application;
}
}