diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/AbstractSpringFunctionAdapterInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/AbstractSpringFunctionAdapterInitializer.java deleted file mode 100644 index f87c40aa7..000000000 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/AbstractSpringFunctionAdapterInitializer.java +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright 2019-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.adapter.gcp; - -import java.io.Closeable; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Supplier; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.WebApplicationType; -import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.cloud.function.context.FunctionRegistration; -import org.springframework.cloud.function.context.FunctionRegistry; -import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; -import org.springframework.cloud.function.context.config.FunctionContextUtils; -import org.springframework.cloud.function.context.config.JsonMessageConverter; -import org.springframework.cloud.function.context.config.RoutingFunction; -import org.springframework.cloud.function.context.config.SmartCompositeMessageConverter; -import org.springframework.cloud.function.json.JacksonMapper; -import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.cloud.function.utils.FunctionClassUtils; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.core.convert.support.GenericConversionService; -import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; - -/** - * Base implementation for adapter initializers and request handlers. - * - * @param the type of the target specific (native) context object. - * - * @author Oleg Zhurakousky - * @since 2.1 - * - * NOTE: Moved from core package. Onl used by GCP - */ -@SuppressWarnings("rawtypes") -abstract class AbstractSpringFunctionAdapterInitializer implements Closeable { - - private static Log logger = LogFactory.getLog(AbstractSpringFunctionAdapterInitializer.class); - - /** - * Name of the bean for registering the target execution context passed to `initialize(context)` operation. - */ - public static final String TARGET_EXECUTION_CTX_NAME = "executionContext"; - - private final Class configurationClass; - - private Function function; - - private Consumer consumer; - - private Supplier supplier; - - private FunctionRegistration functionRegistration; - - private AtomicBoolean initialized = new AtomicBoolean(); - - @Autowired(required = false) - protected FunctionCatalog catalog; - - @Autowired(required = false) - protected JsonMapper jsonMapper; - - private ConfigurableApplicationContext context; - - public ConfigurableApplicationContext getContext() { - return context; - } - - AbstractSpringFunctionAdapterInitializer(Class configurationClass) { - Assert.notNull(configurationClass, "'configurationClass' must not be null"); - this.configurationClass = configurationClass; - } - - AbstractSpringFunctionAdapterInitializer() { - this(FunctionClassUtils.getStartClass()); - } - - @Override - public void close() { - if (this.context != null) { - this.context.close(); - } - } - - protected void initialize(C targetContext) { - if (!this.initialized.compareAndSet(false, true)) { - return; - } - logger.info("Initializing: " + this.configurationClass); - SpringApplication builder = springApplication(); - ConfigurableApplicationContext context = builder.run(); - context.getAutowireCapableBeanFactory().autowireBean(this); - this.context = context; - if (this.catalog == null) { - SmartCompositeMessageConverter messageConverter = - new SmartCompositeMessageConverter(Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper())))); - this.catalog = new SimpleFunctionRegistry(new GenericConversionService(), - messageConverter, new JacksonMapper(new ObjectMapper())); - initFunctionConsumerOrSupplierFromContext(targetContext); - } - else { - initFunctionConsumerOrSupplierFromCatalog(targetContext); - } - } - - protected Class getInputType() { - - Object func = function(); - if (func != null && func instanceof FunctionInvocationWrapper) { - return FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) func).getInputType())); - } - if (functionRegistration != null) { - return FunctionTypeUtils.getRawType(FunctionTypeUtils.getInputType(functionRegistration.getType())); - } - return Object.class; - } - - @SuppressWarnings("unchecked") - protected Function, Publisher> getFunction() { - return function; - } - - protected Object function() { - if (this.function != null) { - return this.function; - } - else if (this.consumer != null) { - return this.consumer; - } - else if (this.supplier != null) { - return this.supplier; - } - return null; - } - - @SuppressWarnings("unchecked") - protected Publisher apply(Publisher input) { - if (this.function != null) { - Object result = this.function.apply(input); - if (result instanceof Publisher) { - return Flux.from((Publisher) result); - } - else { - return Flux.just(result); - } - } - if (this.consumer != null) { - this.consumer.accept(input); - return Flux.empty(); - } - if (this.supplier != null) { - Object result = this.supplier.get(); - if (!(result instanceof Publisher)) { - result = Mono.just(result); - } - return (Publisher) result; - } - throw new IllegalStateException("No function defined"); - } - - /** - * Allows you to resolve function name for cases where it - * could not be located under default name. - * - * Default implementation returns empty string. - * - * @param targetContext the target context instance - * @return the name of the function - */ - protected String doResolveName(Object targetContext) { - return ""; - } - - protected Object convertOutput(Object input, Object output) { - return output; - } - - protected O result(Object input, Publisher output) { - List result = new ArrayList<>(); - for (Object value : Flux.from(output).toIterable()) { - result.add(this.convertOutput(input, value)); - } - if (isSingleInput(getFunction(), input) && result.size() == 1) { - @SuppressWarnings("unchecked") - O value = (O) result.get(0); - return value; - } - if (isSingleOutput(getFunction(), input) && result.size() == 1) { - @SuppressWarnings("unchecked") - O value = (O) result.get(0); - return value; - } - @SuppressWarnings("unchecked") - O value = (O) result; - return CollectionUtils.isEmpty(result) ? null : value; - } - - protected void clear(String name) { - FunctionInvocationWrapper f = this.catalog.lookup(name); - if (f.isFunction()) { - this.function = f; - } - else if (f.isConsumer()) { - this.consumer = f; - } - else { - this.supplier = f; - } - } - - private boolean isSingleInput(Function function, Object input) { - if (!(input instanceof Collection)) { - return true; - } - - if (function != null) { - return Collection.class - .isAssignableFrom(((FunctionInvocationWrapper) function).getRawInputType()); - } - return ((Collection) input).size() <= 1; - } - - private boolean isSingleOutput(Function function, Object output) { - if (!(output instanceof Collection)) { - return true; - } - if (function != null) { - Class outputType = FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) function).getOutputType())); - return Collection.class.isAssignableFrom(outputType); - } - return ((Collection) output).size() <= 1; - } - - private String resolveName(Class type, Object targetContext) { - String functionName = context.getEnvironment().getProperty("function.name"); - if (functionName != null) { - return functionName; - } - else if (type.isAssignableFrom(Function.class)) { - return "function"; - } - else if (type.isAssignableFrom(Consumer.class)) { - return "consumer"; - } - else if (type.isAssignableFrom(Supplier.class)) { - return "supplier"; - } - throw new IllegalStateException("Unknown type " + type); - } - - @SuppressWarnings({ "unchecked" }) - private T getAndInstrumentFromContext(String name) { - this.functionRegistration = - new FunctionRegistration(context.getBean(name), name); - - Type type = FunctionContextUtils. - findType(name, this.context.getBeanFactory()); - - this.functionRegistration = functionRegistration.type(type); - - ((FunctionRegistry) this.catalog).register(functionRegistration); - return this.catalog.lookup(name); - } - - private void initFunctionConsumerOrSupplierFromContext(Object targetContext) { - String name = resolveName(Function.class, targetContext); - if (context.containsBean(name) && context.getBean(name) instanceof Function) { - this.function = getAndInstrumentFromContext(name); - return; - } - - name = resolveName(Consumer.class, targetContext); - if (context.containsBean(name) && context.getBean(name) instanceof Consumer) { - this.function = getAndInstrumentFromContext(name); // FluxConsumer or any other consumer wrapper is a Function - return; - } - - name = resolveName(Supplier.class, targetContext); - if (context.containsBean(name) && context.getBean(name) instanceof Supplier) { - this.supplier = getAndInstrumentFromContext(name); - return; - } - } - - private void initFunctionConsumerOrSupplierFromCatalog(Object targetContext) { - String name = resolveName(Function.class, targetContext); - this.function = this.catalog.lookup(Function.class, name); - if (this.function != null) { - return; - } - - name = resolveName(Consumer.class, targetContext); - this.consumer = this.catalog.lookup(Consumer.class, name); - if (this.consumer != null) { - return; - } - - name = resolveName(Supplier.class, targetContext); - this.supplier = this.catalog.lookup(Supplier.class, name); - if (this.supplier != null) { - return; - } - - - if (this.catalog.size() >= 1 && this.catalog.size() <= 2) { // we may have RoutingFunction function - String functionName = this.catalog.getNames(Function.class).stream() - .filter(n -> !n.equals(RoutingFunction.FUNCTION_NAME)) - .findFirst().orElseGet(() -> null); - if (functionName != null) { - this.function = this.catalog.lookup(Function.class, functionName); - return; - } - functionName = this.catalog.getNames(Supplier.class).stream() - .findFirst().orElseGet(() -> null); - if (functionName != null) { - this.supplier = this.catalog.lookup(Supplier.class, functionName); - return; - } - functionName = this.catalog.getNames(Consumer.class).stream() - .findFirst().orElseGet(() -> null); - if (functionName != null) { - this.consumer = this.catalog.lookup(Consumer.class, functionName); - return; - } - } - else { - name = this.doResolveName(targetContext); - this.function = this.catalog.lookup(Function.class, name); - if (this.function != null) { - return; - } - - this.consumer = this.catalog.lookup(Consumer.class, name); - if (this.consumer != null) { - return; - } - this.supplier = this.catalog.lookup(Supplier.class, name); - if (this.supplier != null) { - return; - } - } - } - - - private SpringApplication springApplication() { - Class sourceClass = this.configurationClass; - SpringApplication application = new org.springframework.cloud.function.context.FunctionalSpringApplication( - sourceClass); - application.setWebApplicationType(WebApplicationType.NONE); - return application; - } -} diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/FunctionInvoker.java index bc073f452..c921291de 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-gcp/src/main/java/org/springframework/cloud/function/adapter/gcp/FunctionInvoker.java @@ -20,7 +20,9 @@ import java.io.BufferedReader; import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Map.Entry; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import com.google.cloud.functions.Context; @@ -31,7 +33,15 @@ import com.google.cloud.functions.RawBackgroundFunction; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.WebApplicationType; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionalSpringApplication; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration; +import org.springframework.cloud.function.context.config.RoutingFunction; +import org.springframework.cloud.function.utils.FunctionClassUtils; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; @@ -46,10 +56,10 @@ import org.springframework.util.MimeTypeUtils; * @author Dmitry Solomakha * @author Mike Eltsufin * @author Oleg Zhurakousky + * @author Biju Kunjummen * @since 3.0.4 */ -public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer - implements HttpFunction, RawBackgroundFunction { +public class FunctionInvoker implements HttpFunction, RawBackgroundFunction { private static final Log log = LogFactory.getLog(FunctionInvoker.class); @@ -60,34 +70,38 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer configurationClass) { - super(configurationClass); - init(); + init(configurationClass); } - private void init() { - if (System.getenv().containsKey("spring.cloud.function.definition")) { - this.functionName = System.getenv("spring.cloud.function.definition"); - } - + private void init(Class configurationClass) { // Default to GSON if implementation not specified. if (!System.getenv().containsKey(ContextFunctionCatalogAutoConfiguration.JSON_MAPPER_PROPERTY)) { System.setProperty(ContextFunctionCatalogAutoConfiguration.JSON_MAPPER_PROPERTY, "gson"); } - Thread.currentThread() // TODO: remove after upgrading to 1.0.0-alpha-2-rc5 - .setContextClassLoader(FunctionInvoker.class.getClassLoader()); - initialize(null); + .setContextClassLoader(FunctionInvoker.class.getClassLoader()); + + log.info("Initializing: " + configurationClass); + SpringApplication springApplication = springApplication(configurationClass); + this.context = springApplication.run(); + this.catalog = this.context.getBean(FunctionCatalog.class); + initFunctionConsumerOrSupplierFromCatalog(); } private Function, Message> lookupFunction() { Function, Message> function = this.catalog.lookup(functionName, - MimeTypeUtils.APPLICATION_JSON.toString()); + MimeTypeUtils.APPLICATION_JSON.toString()); Assert.notNull(function, "'function' with name '" + functionName + "' must not be null"); return function; } @@ -98,10 +112,9 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer, Message> function = lookupFunction(); - Message message = getInputType() == Void.class || getInputType() == null ? null + Message message = this.functionWrapped.getInputType() == Void.class || this.functionWrapped.getInputType() == null ? null : MessageBuilder.withPayload(httpRequest.getReader()).copyHeaders(httpRequest.getHeaders()).build(); Message result = function.apply(message); @@ -135,16 +148,16 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer, Message> function = lookupFunction(); - Message message = getInputType() == Void.class ? null - : MessageBuilder.withPayload(json).setHeader("gcf_context", context).build(); + Message message = this.functionWrapped.getInputType() == Void.class ? null + : MessageBuilder.withPayload(json).setHeader("gcf_context", context).build(); Message result = function.apply(message); @@ -153,4 +166,60 @@ public class FunctionInvoker extends AbstractSpringFunctionAdapterInitializer type) { + if (System.getenv().containsKey("spring.cloud.function.definition")) { + return System.getenv("spring.cloud.function.definition"); + } + String functionName = this.context.getEnvironment().getProperty("function.name"); + if (functionName != null) { + return functionName; + } + else if (type.isAssignableFrom(Function.class)) { + return "function"; + } + else if (type.isAssignableFrom(Consumer.class)) { + return "consumer"; + } + else if (type.isAssignableFrom(Supplier.class)) { + return "supplier"; + } + throw new IllegalStateException("Unknown type " + type); + } + + private SpringApplication springApplication(Class configurationClass) { + SpringApplication application = new FunctionalSpringApplication(configurationClass); + application.setWebApplicationType(WebApplicationType.NONE); + return application; + } + }