Added support for class loader to share common transport packages such as reactor

Simplify user exposure to deploying archives
This commit is contained in:
Oleg Zhurakousky
2019-08-10 14:44:35 +02:00
parent 3d67f84144
commit 3349c3abed
9 changed files with 433 additions and 530 deletions

View File

@@ -435,7 +435,7 @@ public class BeanFactoryAwareFunctionRegistry
}
private Object convertOutputValueIfNecessary(Object value, String... acceptedOutputMimeTypes) {
logger.info("Applying type conversion on output value");
logger.debug("Applying type conversion on output value");
Object convertedValue = null;
if (FunctionTypeUtils.isMultipleArgumentsHolder(value)) {
int outputCount = FunctionTypeUtils.getOutputCount(this.functionType);
@@ -471,8 +471,8 @@ public class BeanFactoryAwareFunctionRegistry
}
private Publisher<?> convertOutputPublisherIfNecessary(Publisher<?> publisher, String... acceptedOutputMimeTypes) {
if (logger.isInfoEnabled()) {
logger.info("Applying type conversion on output Publisher " + publisher);
if (logger.isDebugEnabled()) {
logger.debug("Applying type conversion on output Publisher " + publisher);
}
Publisher<?> result = publisher instanceof Mono
@@ -482,8 +482,8 @@ public class BeanFactoryAwareFunctionRegistry
}
private Publisher<?> convertInputPublisherIfNecessary(Publisher<?> publisher, Type type) {
if (logger.isInfoEnabled()) {
logger.info("Applying type conversion on input Publisher " + publisher);
if (logger.isDebugEnabled()) {
logger.debug("Applying type conversion on input Publisher " + publisher);
}
Publisher<?> result = publisher instanceof Mono
@@ -493,9 +493,9 @@ public class BeanFactoryAwareFunctionRegistry
}
private Object convertInputValueIfNecessary(Object value, Type type) {
if (logger.isInfoEnabled()) {
logger.info("Applying type conversion on input value ");
logger.info("Function type: " + this.functionType);
if (logger.isDebugEnabled()) {
logger.debug("Applying type conversion on input value " + value);
logger.debug("Function type: " + this.functionType);
}
Object convertedValue = value;

View File

@@ -12,27 +12,4 @@ public class SimpleFunctionAppApplication {
public static void main(String[] args) {
SpringApplication.run(SimpleFunctionAppApplication.class, args);
}
public static class Person {
private String name;
private int id;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
}
}

View File

@@ -1,63 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*/
public abstract class ApplicationContainer {
private final FunctionCatalog functionCatalog;
private final FunctionInspector functionInspector;
private final FunctionProperties functionProperties;
public ApplicationContainer(FunctionCatalog functionCatalog,
FunctionInspector functionInspector, FunctionProperties functionProperties) {
this.functionCatalog = functionCatalog;
this.functionInspector = functionInspector;
this.functionProperties = functionProperties;
}
protected FunctionCatalog getFunctionCatalog() {
return this.functionCatalog;
}
protected FunctionInspector getFunctionInspector() {
return this.functionInspector;
}
protected FunctionProperties getFunctionProperties() {
return this.functionProperties;
}
@SuppressWarnings("unchecked")
public <T> T getFunction() {
return (T) this.functionCatalog.lookup(this.functionProperties.getFunctionName());
}
@SuppressWarnings("unchecked")
public <T> T getFunction(String... acceptedOutputMimeTypes) {
return (T) this.functionCatalog.lookup(this.functionProperties.getFunctionName(), acceptedOutputMimeTypes);
}
}

View File

@@ -1,236 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.loader.JarLauncher;
import org.springframework.boot.loader.LaunchedURLClassLoader;
import org.springframework.boot.loader.archive.Archive;
import org.springframework.boot.loader.jar.JarFile;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ReflectionUtils.MethodCallback;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*
*/
class ExternalFunctionJarLauncher extends JarLauncher {
private static Log logger = LogFactory.getLog(ExternalFunctionJarLauncher.class);
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
private final Archive archive;
private final boolean applicationWithMain;
ExternalFunctionJarLauncher(Archive archive) {
super(archive);
this.archive = archive;
this.applicationWithMain = this.isBootApplicationWithMain();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
protected void deploy(FunctionRegistry functionRegistry, FunctionProperties functionProperties, String[] args) {
ClassLoader currentLoader = Thread.currentThread().getContextClassLoader();
try {
this.doLaunch(args);
Map<String, Object> functions = this.discoverFunctions();
if (logger.isInfoEnabled()) {
logger.info("Discovered functions: " + functions);
}
for (Entry<String, Object> entry : functions.entrySet()) {
FunctionRegistration registration = new FunctionRegistration(entry.getValue(), entry.getKey());
Type type = this.findType(entry.getKey());
if (logger.isInfoEnabled()) {
logger.info("Registering function '" + entry.getKey() + "' of type '" + type
+ "' in FunctionRegistry.");
}
registration.type(type);
functionRegistry.register(registration);
}
FunctionRegistration registration = this.discovereAndLoadFunctionFromClassName(functionProperties);
if (registration != null) {
functionRegistry.register(registration);
}
}
catch (Exception e) {
throw new IllegalStateException("Failed to deploy archive " + archive, e);
}
finally {
Thread.currentThread().setContextClassLoader(currentLoader);
}
}
@Override
protected ClassLoader createClassLoader(URL[] urls) throws Exception {
String className = DeployerContextUtils.class.getName();
String classAsPath = className.replace('.', '/') + ".class";
byte[] fcuBytes = StreamUtils
.copyToByteArray(DeployerContextUtils.class.getClassLoader().getResourceAsStream(classAsPath));
/*
* While LaunchedURLClassLoader is completely disconnected with the current
* class loader, this will still allow it to see FunctionContextUtils
*/
return new LaunchedURLClassLoader(urls, getClass().getClassLoader().getParent()) {
boolean functionContextUtilsLoaded;
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
if (!ExternalFunctionJarLauncher.this.applicationWithMain) {
try {
return getClass().getClassLoader().loadClass(name);
}
catch (Exception e) {
// ignore and proceed with context ClassLoader
}
}
return super.loadClass(name, false);
}
@Override
protected Class<?> findClass(final String name) throws ClassNotFoundException {
if (name.startsWith("reactor.")) {
System.out.println();
}
if (!functionContextUtilsLoaded && className.equals(name)) {
Class<?> fcuClass = defineClass(name, fcuBytes, 0, fcuBytes.length);
this.functionContextUtilsLoaded = true;
return fcuClass;
}
return super.findClass(name);
}
};
}
private FunctionRegistration<?> discovereAndLoadFunctionFromClassName(FunctionProperties functionProperties) throws Exception {
FunctionRegistration<?> functionRegistration = null;
AtomicReference<Type> typeRef = new AtomicReference<>();
if (StringUtils.hasText(functionProperties.getFunctionClass())) {
System.out.println("=====> " + Thread.currentThread().getContextClassLoader());
Class<?> functionClass = Thread.currentThread().getContextClassLoader().loadClass(functionProperties.getFunctionClass());
ReflectionUtils.doWithMethods(functionClass, new MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
typeRef.set(FunctionTypeUtils.getFunctionTypeFromFunctionMethod(method));
}
}, new MethodFilter() {
@Override
public boolean matches(Method method) {
String name = method.getName();
return typeRef.get() == null && !method.isBridge()
&& ("apply".equals(name) || "accept".equals(name) || "get".equals(name));
}
});
if (typeRef.get() != null) {
Object functionInstance = functionClass.newInstance();
functionRegistration = new FunctionRegistration<>(functionInstance,
StringUtils.uncapitalize(functionClass.getSimpleName()));
functionRegistration.type(typeRef.get());
}
}
return functionRegistration;
}
protected boolean isBootApplicationWithMain() {
try {
return StringUtils.hasText(this.archive.getManifest().getMainAttributes().getValue("Start-Class"));
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
private void doLaunch(String[] args) throws Exception {
JarFile.registerUrlProtocolHandler();
Thread.currentThread().setContextClassLoader(createClassLoader(getClassPathArchives()));
System.out.println("=====> " + Thread.currentThread().getContextClassLoader());
evalContext.setTypeLocator(new StandardTypeLocator(Thread.currentThread().getContextClassLoader()));
if (this.isBootApplicationWithMain()) {
String mainClassName = getMainClass();
Class<?> mainClass = Thread.currentThread().getContextClassLoader().loadClass(mainClassName);
Class<?> bootAppClass = Thread.currentThread().getContextClassLoader()
.loadClass(SpringApplication.class.getName());
Method runMethod = bootAppClass.getDeclaredMethod("run", Class.class, String[].class);
Object applicationContext = runMethod.invoke(null, mainClass, (Object) args);
if (logger.isInfoEnabled()) {
logger.info("Application context for archive '" + archive.getUrl() + "' is created.");
}
evalContext.setVariable("context", applicationContext);
setBeanFactory(applicationContext);
}
}
private void setBeanFactory(Object applicationContext) throws Exception {
Expression parsed = new SpelExpressionParser().parseExpression("#context.getBeanFactory()");
Object beanFactory = parsed.getValue(evalContext);
evalContext.setVariable("bf", beanFactory);
}
private Type findType(String name) {
evalContext.setVariable("functionName", name);
String expr = "T(" + DeployerContextUtils.class.getName() + ").findType(#bf, #functionName)";
Expression parsed = new SpelExpressionParser().parseExpression(expr);
Object type = parsed.getValue(evalContext);
return (Type) type;
}
@SuppressWarnings("unchecked")
private Map<String, Object> discoverFunctions() throws Exception {
Map<String, Object> allFunctions = new HashMap<String, Object>();
if (evalContext.lookupVariable("context") != null) { // no start0class uber jars
Expression parsed = new SpelExpressionParser()
.parseExpression("#context.getBeansOfType(T(java.util.function.Function))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(evalContext));
parsed = new SpelExpressionParser().parseExpression("#context.getBeansOfType(T(java.util.function.Supplier))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(evalContext));
parsed = new SpelExpressionParser().parseExpression("#context.getBeansOfType(T(java.util.function.Consumer))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(evalContext));
}
return allFunctions;
}
}

View File

@@ -0,0 +1,250 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.IOException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.loader.JarLauncher;
import org.springframework.boot.loader.LaunchedURLClassLoader;
import org.springframework.boot.loader.archive.Archive;
import org.springframework.boot.loader.jar.JarFile;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.ReflectionUtils.MethodCallback;
import org.springframework.util.ReflectionUtils.MethodFilter;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*
*/
class FunctionArchiveDeployer extends JarLauncher {
private static Log logger = LogFactory.getLog(FunctionArchiveDeployer.class);
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
private LaunchedURLClassLoader archiveLoader;
FunctionArchiveDeployer(Archive archive) {
super(archive);
}
void undeploy() {
this.stopDeployedApplicationContext();
try {
this.archiveLoader.close();
logger.info("Closed archive class loader");
}
catch (IOException e) {
logger.error("Failed to closed archive class loader", e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
protected void deploy(FunctionRegistry functionRegistry, FunctionProperties functionProperties, String[] args) {
ClassLoader currentLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(createClassLoader(getClassPathArchives()));
evalContext.setTypeLocator(new StandardTypeLocator(Thread.currentThread().getContextClassLoader()));
if (this.isBootApplicationWithMain()) {
this.launchFunctionArchive(args);
Map<String, Object> functions = this.discoverBeanFunctions();
if (logger.isInfoEnabled() && !CollectionUtils.isEmpty(functions)) {
logger.info("Discovered functions in deployed application context: " + functions);
}
for (Entry<String, Object> entry : functions.entrySet()) {
FunctionRegistration registration = new FunctionRegistration(entry.getValue(), entry.getKey());
Type type = this.discoverFunctionType(entry.getKey());
if (logger.isInfoEnabled()) {
logger.info("Registering function '" + entry.getKey() + "' of type '" + type
+ "' in FunctionRegistry.");
}
registration.type(type);
functionRegistry.register(registration);
}
}
if (!StringUtils.isEmpty(functionProperties.getFunctionClass())) {
FunctionRegistration registration = this.discovereAndLoadFunctionFromClassName(functionProperties.getFunctionClass());
if (registration != null) {
functionRegistry.register(registration);
}
}
}
catch (Exception e) {
throw new IllegalStateException("Failed to deploy archive " + this.getArchive(), e);
}
finally {
Thread.currentThread().setContextClassLoader(currentLoader);
}
}
@Override
protected ClassLoader createClassLoader(URL[] urls) throws Exception {
String classAsPath = DeployerContextUtils.class.getName().replace('.', '/') + ".class";
byte[] deployerContextUtilsBytes = StreamUtils
.copyToByteArray(DeployerContextUtils.class.getClassLoader().getResourceAsStream(classAsPath));
/*
* While LaunchedURLClassLoader is completely disconnected with the current
* class loader, this will ensure that classes in org.reactivestreams.* and reactor.*
* are shared across two class loaders since they are effectively used as transport.
*/
this.archiveLoader = new LaunchedURLClassLoader(urls, null) {
@Override
public Class<?> loadClass(String name) throws ClassNotFoundException {
Class<?> clazz = null;
if (name.startsWith("org.reactivestreams") || name.startsWith("reactor.")) {
clazz = getClass().getClassLoader().loadClass(name);
}
else if (name.equals(DeployerContextUtils.class.getName())) {
/*
* This will ensure that `DeployerContextUtils` is available to
* foreign class loader even in cases where foreign JAR does not
* have SCF dependencies.
*/
try {
clazz = super.loadClass(name, false);
}
catch (Exception e) {
clazz = defineClass(name, deployerContextUtilsBytes, 0, deployerContextUtilsBytes.length);
}
}
else {
clazz = super.loadClass(name, false);
}
return clazz;
}
};
return this.archiveLoader;
}
protected boolean isBootApplicationWithMain() {
try {
return StringUtils.hasText(this.getArchive().getManifest().getMainAttributes().getValue("Start-Class"));
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
private FunctionRegistration<?> discovereAndLoadFunctionFromClassName(String functionClassName) throws Exception {
FunctionRegistration<?> functionRegistration = null;
AtomicReference<Type> typeRef = new AtomicReference<>();
Class<?> functionClass = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
ReflectionUtils.doWithMethods(functionClass, new MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
typeRef.set(FunctionTypeUtils.getFunctionTypeFromFunctionMethod(method));
}
}, new MethodFilter() {
@Override
public boolean matches(Method method) {
String name = method.getName();
return typeRef.get() == null && !method.isBridge()
&& ("apply".equals(name) || "accept".equals(name) || "get".equals(name));
}
});
if (typeRef.get() != null) {
Object functionInstance = functionClass.newInstance();
functionRegistration = new FunctionRegistration<>(functionInstance,
StringUtils.uncapitalize(functionClass.getSimpleName()));
functionRegistration.type(typeRef.get());
}
return functionRegistration;
}
private void launchFunctionArchive(String[] args) throws Exception {
JarFile.registerUrlProtocolHandler();
String mainClassName = getMainClass();
Class<?> mainClass = Thread.currentThread().getContextClassLoader().loadClass(mainClassName);
Class<?> bootAppClass = Thread.currentThread().getContextClassLoader()
.loadClass(SpringApplication.class.getName());
Method runMethod = bootAppClass.getDeclaredMethod("run", Class.class, String[].class);
Object applicationContext = runMethod.invoke(null, mainClass, (Object) args);
if (logger.isInfoEnabled()) {
logger.info("Application context for archive '" + this.getArchive().getUrl() + "' is created.");
}
evalContext.setVariable("context", applicationContext);
setBeanFactory(applicationContext);
}
private void setBeanFactory(Object applicationContext) {
Expression parsed = new SpelExpressionParser().parseExpression("#context.getBeanFactory()");
Object beanFactory = parsed.getValue(this.evalContext);
evalContext.setVariable("bf", beanFactory);
}
private Type discoverFunctionType(String name) {
evalContext.setVariable("functionName", name);
String expr = "T(" + DeployerContextUtils.class.getName() + ").findType(#bf, #functionName)";
Expression parsed = new SpelExpressionParser().parseExpression(expr);
Object type = parsed.getValue(this.evalContext);
return (Type) type;
}
private void stopDeployedApplicationContext() {
if (evalContext.lookupVariable("context") != null) { // no start-class uber jars
Expression parsed = new SpelExpressionParser().parseExpression("#context.stop()");
parsed.getValue(this.evalContext);
}
}
@SuppressWarnings("unchecked")
private Map<String, Object> discoverBeanFunctions() {
Map<String, Object> allFunctions = new HashMap<String, Object>();
if (evalContext.lookupVariable("context") != null) { // no start-class uber jars
Expression parsed = new SpelExpressionParser()
.parseExpression("#context.getBeansOfType(T(java.util.function.Function))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(this.evalContext));
parsed = new SpelExpressionParser().parseExpression("#context.getBeansOfType(T(java.util.function.Supplier))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(this.evalContext));
parsed = new SpelExpressionParser().parseExpression("#context.getBeansOfType(T(java.util.function.Consumer))");
allFunctions.putAll((Map<String, Object>) parsed.getValue(this.evalContext));
}
return allFunctions;
}
}

View File

@@ -1,85 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.File;
import java.lang.reflect.Constructor;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.loader.archive.Archive;
import org.springframework.boot.loader.archive.JarFileArchive;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*/
@SpringBootApplication
@EnableConfigurationProperties(FunctionProperties.class)
public class FunctionDeployerBootstrap implements ApplicationContextAware {
public static FunctionDeployerBootstrap instance(String... args) {
ApplicationContext context = SpringApplication.run(FunctionDeployerBootstrap.class, args);
return context.getBean(FunctionDeployerBootstrap.class);
}
private ConfigurableApplicationContext applicationContext;
@Autowired
private FunctionProperties functionProperties;
@Autowired
private FunctionCatalog functionCatalog;
@Autowired
private FunctionInspector functionInspector;
@SuppressWarnings("unchecked")
public <T extends ApplicationContainer> T run(Class<?> configurationClass, String... args) {
try {
Archive archive = new JarFileArchive(new File(functionProperties.getLocation()));
ExternalFunctionJarLauncher launcher = new ExternalFunctionJarLauncher(archive);
launcher.deploy(this.applicationContext.getBean(FunctionRegistry.class), this.applicationContext.getBean(FunctionProperties.class), args);
Constructor<? extends ApplicationContainer> applicationContainerCtr = (Constructor<? extends ApplicationContainer>) configurationClass
.getDeclaredConstructor(FunctionCatalog.class, FunctionInspector.class, FunctionProperties.class);
ApplicationContainer applicationContainer = applicationContainerCtr.newInstance(this.functionCatalog,
this.functionInspector, this.functionProperties);
return (T) applicationContainer;
}
catch (Exception e) {
throw new IllegalStateException("Failed to launch archive: " + functionProperties.getLocation(), e);
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
}

View File

@@ -19,13 +19,16 @@ package org.springframework.cloud.function.deployer;
import java.io.File;
import java.io.IOException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.loader.archive.Archive;
import org.springframework.boot.loader.archive.JarFileArchive;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
/**
@@ -37,24 +40,56 @@ import org.springframework.context.annotation.Bean;
*/
@EnableAutoConfiguration
@EnableConfigurationProperties(FunctionProperties.class)
public class FunctionDeployerConfiguration {
public final class FunctionDeployerConfiguration {
private static Log logger = LogFactory.getLog(FunctionDeployerConfiguration.class);
@Bean
public SmartInitializingSingleton functionDeployer(FunctionProperties functionProperties,
SmartLifecycle functionArchiveDeployer(FunctionProperties functionProperties,
FunctionRegistry functionRegistry, ApplicationArguments arguments) {
return new SmartInitializingSingleton() {
Archive archive = null;
try {
archive = new JarFileArchive(new File(functionProperties.getLocation()));
}
catch (IOException e) {
throw new IllegalStateException("Failed to create archive: " + functionProperties.getLocation(), e);
}
FunctionArchiveDeployer deployer = new FunctionArchiveDeployer(archive);
return new SmartLifecycle() {
private boolean running;
@Override
public void afterSingletonsInstantiated() {
Archive archive = null;
try {
archive = new JarFileArchive(new File(functionProperties.getLocation()));
public void stop() {
if (logger.isInfoEnabled()) {
logger.info("Undeploying archive: " + functionProperties.getLocation());
}
catch (IOException e) {
throw new IllegalStateException("Failed to create archive: " + functionProperties.getLocation(), e);
deployer.undeploy();
if (logger.isInfoEnabled()) {
logger.info("Successfully undeployed archive: " + functionProperties.getLocation());
}
ExternalFunctionJarLauncher launcher = new ExternalFunctionJarLauncher(archive);
launcher.deploy(functionRegistry, functionProperties, arguments.getSourceArgs());
this.running = false;
}
@Override
public void start() {
if (logger.isInfoEnabled()) {
logger.info("Deploying archive: " + functionProperties.getLocation());
}
deployer.deploy(functionRegistry, functionProperties, arguments.getSourceArgs());
this.running = true;
if (logger.isInfoEnabled()) {
logger.info("Successfully deployed archive: " + functionProperties.getLocation());
}
}
@Override
public boolean isRunning() {
return this.running;
}
};
}
}

View File

@@ -1,103 +0,0 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.junit.Test;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*/
public class ApplicationContainerTests {
@Test
public void testCustomApplicationContainerWithBootJar() throws Exception {
String[] args = new String[] {"--spring.cloud.function.location=target/it/bootjar/target/bootjar-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-class=function.example.UpperCaseFunction"};
JavaInvoker invokerByClass =
FunctionDeployerBootstrap.instance(args).run(JavaInvoker.class, args);
assertThat(invokerByClass.uppercaseSimple("bob")).isEqualTo("BOB");
assertThat(invokerByClass.uppercaseSimple("stacy")).isEqualTo("STACY");
}
@Test
public void testCustomApplicationContainerWithBootJarNoStartClass() throws Exception {
String[] args = new String[] {"--spring.cloud.function.location=target/it/bootjarnostart/target/bootjarnostart-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-class=function.example.UpperCaseFunction"};
JavaInvoker invokerByClass =
FunctionDeployerBootstrap.instance(args).run(JavaInvoker.class, args);
assertThat(invokerByClass.uppercaseSimple("bob")).isEqualTo("BOB");
assertThat(invokerByClass.uppercaseSimple("stacy")).isEqualTo("STACY");
}
@Test
public void testCustomApplicationContainerWithBootAppSimpleTypes() throws Exception {
String[] args = new String[] {"--spring.cloud.function.location=target/it/bootapp/target/bootapp-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-name=uppercase"};
JavaInvoker invokerByBean =
FunctionDeployerBootstrap.instance(args).run(JavaInvoker.class, args);
Message<byte[]> result = invokerByBean.uppercase(MessageBuilder.withPayload("\"bob\"".getBytes(StandardCharsets.UTF_8)).build());
assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\"");
}
@Test
public void testCustomApplicationContainerWithBootAppWithTypeConversion() throws Exception {
String[] args = new String[] {"--spring.cloud.function.location=target/it/bootapp/target/bootapp-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-name=uppercasePerson"};
JavaInvoker invokerByBean =
FunctionDeployerBootstrap.instance(args).run(JavaInvoker.class, args);
Message<byte[]> result = invokerByBean.uppercase(MessageBuilder.withPayload("{\"name\":\"bob\",\"id\":1}".getBytes(StandardCharsets.UTF_8)).build());
assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("{\"name\":\"BOB\",\"id\":1}");
}
private static class JavaInvoker extends ApplicationContainer {
JavaInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector,
FunctionProperties functionProperties) {
super(functionCatalog, functionInspector, functionProperties);
}
public Message<byte[]> uppercase(Message<byte[]> input) {
Function<Message<byte[]>, Message<byte[]>> functon = this.getFunction("application/json");
return functon.apply(input);
}
public String uppercaseSimple(String input) {
Function<String, String> functon = this.getFunction();
return functon.apply(input);
}
}
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Function;
import org.junit.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.ApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
/**
*
* @author Oleg Zhurakousky
* @since 3.0
*/
public class FunctionDeployerTests {
/*
* Target function `class UpperCaseFunction implements Function<String, String>`
* Main/Start class present, no Spring configuration
*/
@Test
public void testWithMainAndStartClassNoSpringConfiguration() throws Exception {
String[] args = new String[] {
"--spring.cloud.function.location=target/it/bootjar/target/bootjar-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-class=function.example.UpperCaseFunction" };
ApplicationContext context = SpringApplication.run(FunctionDeployerConfiguration.class, args);
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Function<String, String> function = catalog.lookup("upperCaseFunction");
assertThat(function.apply("bob")).isEqualTo("BOB");
assertThat(function.apply("stacy")).isEqualTo("STACY");
Function<Flux<String>, Flux<String>> functionAsFlux = catalog.lookup("upperCaseFunction");
List<String> results = functionAsFlux.apply(Flux.just("bob", "stacy")).collectList().block();
assertThat(results.get(0)).isEqualTo("BOB");
assertThat(results.get(1)).isEqualTo("STACY");
}
/*
* Target function `class UpperCaseFunction implements Function<String, String>`
* No Main/Start class present, no Spring configuration
*/
@Test
public void testNoMainAndNoStartClassAndNoSpringConfiguration() throws Exception {
String[] args = new String[] {
"--spring.cloud.function.location=target/it/bootjarnostart/target/bootjarnostart-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-class=function.example.UpperCaseFunction" };
ApplicationContext context = SpringApplication.run(FunctionDeployerConfiguration.class, args);
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Function<String, String> function = catalog.lookup("upperCaseFunction");
assertThat(function.apply("bob")).isEqualTo("BOB");
assertThat(function.apply("stacy")).isEqualTo("STACY");
Function<Flux<String>, Flux<String>> functionAsFlux = catalog.lookup("upperCaseFunction");
List<String> results = functionAsFlux.apply(Flux.just("bob", "stacy")).collectList().block();
assertThat(results.get(0)).isEqualTo("BOB");
assertThat(results.get(1)).isEqualTo("STACY");
}
/*
* Target function:
*
* @Bean public Function<String, String> uppercase()
*/
@Test
public void testWithMainAndStartClassAndSpringConfiguration() throws Exception {
String[] args = new String[] {
"--spring.cloud.function.location=target/it/bootapp/target/bootapp-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-name=uppercase" };
ApplicationContext context = SpringApplication.run(FunctionDeployerConfiguration.class, args);
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Function<Message<byte[]>, Message<byte[]>> function = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = function
.apply(MessageBuilder.withPayload("\"bob\"".getBytes(StandardCharsets.UTF_8)).build());
assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("\"BOB\"");
}
/*
* Target function:
*
* @Bean public Function<Person, Person> uppercasePerson()
*/
@Test
public void testWithMainAndStartClassAndSpringConfigurationAndTypeConversion() throws Exception {
String[] args = new String[] {
"--spring.cloud.function.location=target/it/bootapp/target/bootapp-0.0.1.BUILD-SNAPSHOT-exec.jar",
"--spring.cloud.function.function-name=uppercasePerson" };
ApplicationContext context = SpringApplication.run(FunctionDeployerConfiguration.class, args);
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Function<Message<byte[]>, Message<byte[]>> function = catalog.lookup("uppercasePerson", "application/json");
Message<byte[]> result = function.apply(
MessageBuilder.withPayload("{\"name\":\"bob\",\"id\":1}".getBytes(StandardCharsets.UTF_8)).build());
assertThat(new String(result.getPayload(), StandardCharsets.UTF_8)).isEqualTo("{\"name\":\"BOB\",\"id\":1}");
}
}