Replace deployer with a simpler library

Instead of an app, it is now a library with some utilities
(principally ApplicationBootstrap) for launching a Spring Boot
application, extracting a function, and registering it in the
FunctionRegistry.
This commit is contained in:
Dave Syer
2018-04-25 12:44:16 +01:00
parent 59f94c1533
commit 7fa0ed7b6b
28 changed files with 1769 additions and 1240 deletions

View File

@@ -0,0 +1,35 @@
<?xml version="1.0" encoding="UTF-8"?>
<settings>
<profiles>
<profile>
<id>it-repo</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<repositories>
<repository>
<id>local.central</id>
<url>@localRepositoryUrl@</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>local.central</id>
<url>@localRepositoryUrl@</url>
<releases>
<enabled>true</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</profile>
</profiles>
</settings>

View File

@@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>function-sample</artifactId>
<version>1.0.0.M1</version>
<packaging>jar</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.12.RELEASE</version>
<relativePath/>
</parent>
<properties>
<java.version>1.8</java.version>
<spring-cloud-function.version>1.0.0.BUILD-SNAPSHOT</spring-cloud-function.version>
<wrapper.version>1.0.10.RELEASE</wrapper.version>
<reactor.version>3.2.0.M1</reactor.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<version>${spring-cloud-function.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<classifier>exec</classifier>
</configuration>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/release</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/libs-snapshot-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/libs-milestone-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-releases</id>
<name>Spring Releases</name>
<url>https://repo.spring.io/libs-release-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

View File

@@ -0,0 +1,27 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.functions;
import java.util.function.Consumer;
public class DoubleLogger implements Consumer<Integer> {
@Override
public void accept(Integer i) {
System.out.println(2 * i);
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2012-2015 the original author or authors.
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -14,23 +14,21 @@
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
package com.example.functions;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.junit.runners.Suite.SuiteClasses;
import java.util.function.Supplier;
/**
* A test suite for probing weird ordering problems in the tests.
*
* @author Dave Syer
* @author Eric Bottard
*/
@RunWith(Suite.class)
@SuiteClasses({ FunctionAppDeployerTests.class,
FunctionExtractingFunctionCatalogTests.class,
FunctionExtractingFunctionCatalogIntegrationTests.class })
@Ignore
public class AdhocTestSuite {
public class Emitter implements Supplier<String> {
private int i = 0;
private String[] values = {"one", "two", "three", "four"};
@Override
public String get() {
return values[i++ % values.length];
}
}

View File

@@ -0,0 +1,47 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.functions;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
/**
* @author Dave Syer
*/
@SpringBootApplication
public class FunctionApp {
@Bean
public DoubleLogger myDoubler() {
return new DoubleLogger();
}
@Bean
public Emitter myEmitter() {
return new Emitter();
}
@Bean
public LengthCounter myCounter() {
return new LengthCounter();
}
public static void main(String[] args) throws Exception {
SpringApplication.run(FunctionApp.class, args);
}
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example.functions;
import java.util.function.Function;
/**
* @author Eric Bottard
*/
public class LengthCounter implements Function<String, Integer> {
@Override
public Integer apply(String string) {
return string.length();
}
}

View File

@@ -0,0 +1,199 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.jar.JarFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.util.StringUtils;
/**
* Utility class to launch a Spring Boot application (optionally) in an isolated class
* loader. The class loader is created in such a way that it is mostly a copy of the
* current class loader (i.e. the one that loaded this class), but has a parent containing
* reactor-core (if present). It can then share the reactor dependency with other class
* loaders that the app itself creates, without any other classes being shared, other than
* the core JDK.
*
* @author Mark Fisher
* @author Dave Syer
*/
public class ApplicationBootstrap {
private static Log logger = LogFactory.getLog(ApplicationBootstrap.class);
private ApplicationRunner runner;
private URLClassLoader classLoader;
/**
* Run the provided main class as a Spring Boot application with the provided command
* line arguments.
*/
public void run(Class<?> mainClass, String... args) {
if (ApplicationBootstrap.isolated(args)) {
runner(mainClass).run(args);
}
else {
SpringApplication.run(mainClass, args);
}
}
/**
* Clean up the resources used by this instance, if any. Called automatically on a
* runtime shutdown hook.
*/
public void close() {
if (this.runner != null) {
this.runner.close();
this.runner = null;
}
if (this.classLoader != null) {
try {
this.classLoader.close();
}
catch (IOException e) {
throw new IllegalStateException("Cannot close ClassLoader", e);
}
finally {
this.classLoader = null;
}
}
}
private ApplicationRunner runner(Class<?> mainClass) {
if (this.runner == null) {
synchronized (this) {
if (this.runner == null) {
this.classLoader = createClassLoader();
this.runner = new ApplicationRunner(this.classLoader,
mainClass.getName());
Runtime.getRuntime().addShutdownHook(new Thread(this::close));
}
}
}
return this.runner;
}
private static boolean isolated(String[] args) {
for (String arg : args) {
if (arg.equals("--function.runner.isolated=false")) {
return false;
}
}
return true;
}
private URLClassLoader createClassLoader() {
URL[] urls = findClassPath();
if (urls.length == 1) {
URL[] classpath = extractClasspath(urls[0]);
if (classpath != null) {
urls = classpath;
}
}
List<URL> child = new ArrayList<>();
List<URL> parent = new ArrayList<>();
for (URL url : urls) {
child.add(url);
}
for (URL url : urls) {
if (isRoot(StringUtils.getFilename(clean(url.toString())))) {
parent.add(url);
child.remove(url);
}
}
logger.debug("Parent: " + parent);
logger.debug("Child: " + child);
ClassLoader base = getClass().getClassLoader();
if (!parent.isEmpty()) {
base = new URLClassLoader(parent.toArray(new URL[0]), base.getParent());
}
return new URLClassLoader(child.toArray(new URL[0]), base);
}
private URL[] findClassPath() {
ClassLoader base = getClass().getClassLoader();
if (!(base instanceof URLClassLoader)) {
try {
// Guess the classpath, based on where we can resolve existing resources
List<URL> list = Collections
.list(getClass().getClassLoader().getResources("META-INF"));
List<URL> result = new ArrayList<>();
result.add(
getClass().getProtectionDomain().getCodeSource().getLocation());
for (URL url : list) {
String path = url.toString();
path = path.substring(0, path.length() - "/META-INF".length());
if (path.endsWith("!")) {
path = path + "/";
}
result.add(new URL(path));
}
return result.toArray(new URL[result.size()]);
}
catch (IOException e) {
throw new IllegalStateException("Cannot find class path", e);
}
}
else {
@SuppressWarnings("resource")
URLClassLoader urlClassLoader = (URLClassLoader) base;
return urlClassLoader.getURLs();
}
}
private boolean isRoot(String file) {
return file.startsWith("reactor-core") || file.startsWith("reactive-streams");
}
private String clean(String jar) {
// This works with fat jars like Spring Boot where the path elements look like
// jar:file:...something.jar!/.
return jar.endsWith("!/") ? jar.substring(0, jar.length() - 2) : jar;
}
private URL[] extractClasspath(URL url) {
// This works for a jar indirection like in surefire and IntelliJ
if (url.toString().endsWith(".jar")) {
JarFile jar;
try {
jar = new JarFile(new File(url.toURI()));
String path = jar.getManifest().getMainAttributes()
.getValue("Class-Path");
if (path != null) {
List<URL> result = new ArrayList<>();
for (String element : path.split(" ")) {
result.add(new URL(element));
}
return result.toArray(new URL[0]);
}
}
catch (Exception e) {
}
}
return null;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,72 +13,56 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.eclipse.aether.graph.Dependency;
import org.springframework.boot.Banner.Mode;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.loader.thin.DependencyResolver;
import org.springframework.cloud.deployer.thin.ContextRunner;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.LiveBeansView;
import org.springframework.core.io.ClassPathResource;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.expression.spel.support.StandardTypeLocator;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
/**
* Driver class for running a Spring Boot application via an isolated classpath.
* Initialize an instance of this class with the class loader to be used and the name of
* the main class (usually a <code>@SpringBootApplication</code>), and then
* {@link #run(String...)} it, cleaning up with a call to {@link #close()}.
*
* @author Dave Syer
*
*/
// NOT a @Component (to prevent it from being scanned by the "main" application).
public class ApplicationRunner implements CommandLineRunner {
public class ApplicationRunner {
private static Log logger = LogFactory.getLog(ApplicationRunner.class);
public static void main(String[] args) {
new ApplicationRunner().start(args);
private final ClassLoader classLoader;
private final String source;
private StandardEvaluationContext app;
public ApplicationRunner(ClassLoader classLoader, String source) {
this.classLoader = classLoader;
this.source = source;
}
public ConfigurableApplicationContext start(String... args) {
return new SpringApplicationBuilder(ApplicationRunner.class).web(false)
.contextClass(AnnotationConfigApplicationContext.class)
.bannerMode(Mode.OFF).properties("spring.main.applicationContextClass="
+ AnnotationConfigApplicationContext.class.getName())
.run(args);
}
private Object app;
private ClassLoader classLoader;
@Override
public void run(String... args) {
ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
try {
this.classLoader = createClassLoader();
ClassUtils.overrideThreadContextClassLoader(this.classLoader);
Class<?> cls = this.classLoader.loadClass(ContextRunner.class.getName());
this.app = cls.newInstance();
runContext(DeployedFunctionApplication.class.getName(), Collections
.singletonMap(LiveBeansView.MBEAN_DOMAIN_PROPERTY_NAME, "deployer"),
this.app = new StandardEvaluationContext(cls.newInstance());
this.app.setTypeLocator(new StandardTypeLocator(this.classLoader));
runContext(this.source, defaultProperties(UUID.randomUUID().toString()),
args);
}
catch (Exception e) {
@@ -93,23 +77,102 @@ public class ApplicationRunner implements CommandLineRunner {
}
}
@PreDestroy
public void close() throws IOException {
closeContext();
if (this.classLoader!=null && this.classLoader instanceof Closeable) {
((Closeable) this.classLoader).close();
private Map<String, String> defaultProperties(String id) {
Map<String, String> map = new HashMap<>();
map.put(LiveBeansView.MBEAN_DOMAIN_PROPERTY_NAME, "function-invoker-" + id);
map.put("spring.jmx.default-domain", "function-invoker-" + id);
map.put("spring.jmx.enabled", "false");
return map;
}
public Object getBean(String name) {
if (this.app != null) {
if (containsBeanByName(name)) {
return getBeanByName(name);
}
try {
return getBeanByType(name);
}
catch (Exception e) {
// not there
}
}
this.classLoader = null;
return null;
}
private boolean containsBeanByName(String name) {
Expression parsed = new SpelExpressionParser()
.parseExpression("context.containsBean(\"" + name + "\")");
return parsed.getValue(this.app, Boolean.class);
}
private Object getBeanByName(String name) {
Expression parsed = new SpelExpressionParser()
.parseExpression("context.getBean(\"" + name + "\")");
return parsed.getValue(this.app);
}
private Object getBeanByType(String name) {
Expression parsed = new SpelExpressionParser()
.parseExpression("context.getBean(T(" + name + "))");
return parsed.getValue(this.app);
}
public boolean containsBean(String name) {
if (this.app != null) {
if (containsBeanByName(name)) {
return true;
}
Expression parsed = new SpelExpressionParser()
.parseExpression("context.getBeansOfType(T(" + name + "))");
try {
@SuppressWarnings("unchecked")
Map<String, Object> beans = (Map<String, Object>) parsed
.getValue(this.app);
return !beans.isEmpty();
}
catch (Exception e) {
}
}
return false;
}
public Object evaluate(String expression, Object root, Object... attrs) {
Expression parsed = new SpelExpressionParser().parseExpression(expression);
StandardEvaluationContext context = new StandardEvaluationContext(root);
if (attrs.length % 2 != 0) {
throw new IllegalArgumentException(
"Context attributes must be name, value pairs");
}
for (int i = 0; i < attrs.length / 2; i++) {
String name = (String) attrs[2 * i];
Object value = attrs[2 * i + 1];
context.setVariable(name, value);
}
return parsed.getValue(context);
}
public boolean isRunning() {
if (this.app == null) {
return false;
}
Expression parsed = new SpelExpressionParser()
.parseExpression("context.isRunning()");
return parsed.getValue(this.app, Boolean.class);
}
@PreDestroy
public void close() {
closeContext();
}
private RuntimeException getError() {
if (this.app == null) {
return null;
}
Method method = ReflectionUtils.findMethod(this.app.getClass(), "getError");
Throwable e;
e = (Throwable) ReflectionUtils.invokeMethod(method, this.app);
if (e==null) {
Expression parsed = new SpelExpressionParser().parseExpression("error");
Throwable e = parsed.getValue(this.app, Throwable.class);
if (e == null) {
return null;
}
if (e instanceof RuntimeException) {
@@ -120,57 +183,21 @@ public class ApplicationRunner implements CommandLineRunner {
private void runContext(String mainClass, Map<String, String> properties,
String... args) {
Method method = ReflectionUtils.findMethod(this.app.getClass(), "run",
String.class, Map.class, String[].class);
ReflectionUtils.invokeMethod(method, this.app, mainClass, properties, args);
Expression parsed = new SpelExpressionParser()
.parseExpression("run(#main,#properties,#args)");
StandardEvaluationContext context = this.app;
context.setVariable("main", mainClass);
context.setVariable("properties", properties);
context.setVariable("args", args);
parsed.getValue(context);
}
private void closeContext() {
Method method = ReflectionUtils.findMethod(this.app.getClass(), "close");
ReflectionUtils.invokeMethod(method, this.app);
}
private ClassLoader createClassLoader() {
ClassLoader base = getClass().getClassLoader();
if (!(base instanceof URLClassLoader)) {
throw new IllegalStateException("Need a URL class loader, found: " + base);
if (this.app != null) {
Expression parsed = new SpelExpressionParser().parseExpression("close()");
parsed.getValue(this.app);
this.app = null;
}
@SuppressWarnings("resource")
URLClassLoader urlClassLoader = (URLClassLoader) base;
URL[] urls = urlClassLoader.getURLs();
List<URL> child = new ArrayList<>();
List<URL> parent = new ArrayList<>();
for (URL url : urls) {
child.add(url);
}
List<File> resolved = resolveParent();
for (File archive : resolved) {
try {
URL url = archive.toURI().toURL();
parent.add(url);
child.remove(url);
}
catch (MalformedURLException e) {
throw new IllegalStateException("Cannot locate jar for: " + archive);
}
}
logger.info("Parent: " + parent);
logger.info("Child: " + child);
if (!parent.isEmpty()) {
base = new URLClassLoader(parent.toArray(new URL[0]), base.getParent());
}
return new URLClassLoader(child.toArray(new URL[0]), base);
}
private List<File> resolveParent() {
DependencyResolver resolver = DependencyResolver.instance();
List<Dependency> dependencies = resolver
.dependencies(new ClassPathResource("core-pom.xml"));
List<File> resolved = new ArrayList<>();
for (Dependency dependency : dependencies) {
resolved.add(resolver.resolve(dependency));
}
return resolved;
}
}

View File

@@ -0,0 +1,118 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
/**
* @author Dave Syer
*
*/
public class BeanCountingApplicationListener
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
public static final String MARKER = "Invoker app started";
private static Log logger = LogFactory.getLog(BeanCountingApplicationListener.class);
private ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext context) throws BeansException {
this.context = context;
}
@SuppressWarnings("resource")
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (!event.getApplicationContext().equals(this.context)) {
return;
}
int count = 0;
ConfigurableApplicationContext context = event.getApplicationContext();
String id = context.getId();
List<String> names = new ArrayList<>();
while (context != null) {
count += context.getBeanDefinitionCount();
names.addAll(Arrays.asList(context.getBeanDefinitionNames()));
context = (ConfigurableApplicationContext) context.getParent();
}
logger.info("Bean count: " + id + "=" + count);
logger.debug("Bean names: " + id + "=" + names);
try {
logger.info("Class count: " + id + "=" + ManagementFactory
.getClassLoadingMXBean().getTotalLoadedClassCount());
}
catch (Exception e) {
}
if (isSpringBootApplication(sources(event))) {
try {
logger.info(MARKER);
}
catch (Exception e) {
}
}
}
private boolean isSpringBootApplication(Set<Class<?>> sources) {
for (Class<?> source : sources) {
if (AnnotatedElementUtils.hasAnnotation(source,
SpringBootConfiguration.class)) {
return true;
}
}
return false;
}
private Set<Class<?>> sources(ApplicationReadyEvent event) {
Method method = ReflectionUtils.findMethod(SpringApplication.class,
"getAllSources");
if (method == null) {
method = ReflectionUtils.findMethod(SpringApplication.class, "getSources");
}
ReflectionUtils.makeAccessible(method);
@SuppressWarnings("unchecked")
Set<Object> objects = (Set<Object>) ReflectionUtils.invokeMethod(method,
event.getSpringApplication());
Set<Class<?>> result = new LinkedHashSet<>();
for (Object object : objects) {
if (object instanceof String) {
object = ClassUtils.resolveClassName((String) object, null);
}
result.add((Class<?>) object);
}
return result;
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.lang.reflect.Field;
import java.net.URL;
import java.util.Map;
import org.springframework.beans.BeanUtils;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
/**
* Utility class for starting a Spring Boot application in a separate thread. Best used
* from an isolated class loader, e.g. through {@link ApplicationRunner}.
*
* @author Dave Syer
*/
public class ContextRunner {
private ConfigurableApplicationContext context;
private Thread runThread;
private volatile boolean running = false;
private Throwable error;
private long timeout = 120000;
public void run(final String source, final Map<String, Object> properties,
final String... args) {
// Run in new thread to ensure that the context classloader is setup
this.runThread = new Thread(new Runnable() {
@Override
public void run() {
try {
resetUrlHandler();
StandardEnvironment environment = new StandardEnvironment();
environment.getPropertySources().addAfter(
StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME,
new MapPropertySource("appDeployer", properties));
running = true;
SpringApplicationBuilder builder = builder(
ClassUtils.resolveClassName(source, null));
if (ClassUtils.isPresent(
"org.springframework.cloud.stream.app.function.app.BeanCountingApplicationListener.BeanCountingApplicationListener()",
null)) {
builder.listeners(new BeanCountingApplicationListener());
}
context = builder.environment(environment).registerShutdownHook(false)
.run(args);
}
catch (Throwable ex) {
error = ex;
}
}
});
this.runThread.start();
try {
this.runThread.join(timeout);
this.running = context != null && context.isRunning();
}
catch (InterruptedException e) {
this.running = false;
Thread.currentThread().interrupt();
}
}
public void close() {
if (this.context != null) {
this.context.close();
resetUrlHandler();
}
// TODO: JDBC leak protection?
this.running = false;
this.runThread.setContextClassLoader(null);
this.runThread = null;
}
public ConfigurableApplicationContext getContext() {
return this.context;
}
private void resetUrlHandler() {
if (ClassUtils.isPresent(
"org.apache.catalina.webresources.TomcatURLStreamHandlerFactory", null)) {
setField(ClassUtils.resolveClassName(
"org.apache.catalina.webresources.TomcatURLStreamHandlerFactory",
null), "instance", null);
setField(URL.class, "factory", null);
}
}
private void setField(Class<?> type, String name, Object value) {
Field field = ReflectionUtils.findField(type, name);
ReflectionUtils.makeAccessible(field);
ReflectionUtils.setField(field, null, value);
}
public boolean isRunning() {
return running;
}
public Throwable getError() {
return this.error;
}
public static SpringApplicationBuilder builder(Class<?> type) {
// Defensive reflective builder to work with Boot 1.5 and 2.0
if (ClassUtils.hasConstructor(SpringApplicationBuilder.class, Class[].class)) {
return BeanUtils
.instantiateClass(
ClassUtils.getConstructorIfAvailable(
SpringApplicationBuilder.class, Class[].class),
(Object) new Class<?>[] { type });
}
return BeanUtils
.instantiateClass(
ClassUtils.getConstructorIfAvailable(
SpringApplicationBuilder.class, Object[].class),
(Object) new Object[] { type.getName() });
}
}

View File

@@ -1,84 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.HandlerInterceptor;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter;
/**
* @author Dave Syer
*
*/
@Component
public class DeployedApplicationFilter extends WebMvcConfigurerAdapter
implements HandlerInterceptor {
private final FunctionExtractingFunctionCatalog deployer;
@Autowired
public DeployedApplicationFilter(FunctionExtractingFunctionCatalog deployer) {
this.deployer = deployer;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(this);
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
String path = (String) request
.getAttribute(HandlerMapping.PATH_WITHIN_HANDLER_MAPPING_ATTRIBUTE);
if (path != null) {
// TODO: extract /stream to config property
if (path.startsWith("/stream")) {
String name = path.substring("/stream/".length());
if (name.contains("/")) {
name = name.substring(0, name.indexOf("/"));
}
if (deployer.deployed().containsKey(name)) {
return true;
}
else {
response.setStatus(HttpStatus.NOT_FOUND.value());
return false;
}
}
}
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response,
Object handler, ModelAndView modelAndView) throws Exception {
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) throws Exception {
}
}

View File

@@ -1,66 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.Collections;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* @author Dave Syer
*
*/
@RestController
@RequestMapping("/admin")
public class FunctionAdminController {
private final FunctionExtractingFunctionCatalog deployer;
@Autowired
public FunctionAdminController(FunctionExtractingFunctionCatalog deployer) {
this.deployer = deployer;
}
@PostMapping(path = "/{name}")
public Map<String, Object> push(@PathVariable String name, @RequestParam String path)
throws Exception {
String id = deploy(name, path);
return Collections.singletonMap("id", id);
}
@DeleteMapping(path = "/{name}")
public Object undeploy(@PathVariable String name) throws Exception {
return deployer.undeploy(name);
}
@GetMapping({ "", "/" })
public Map<String, Object> deployed() {
return deployer.deployed();
}
private String deploy(String name, String path, String... args) throws Exception {
String deployed = deployer.deploy(name, path, args);
return deployed;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2016-2017 the original author or authors.
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,15 +13,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.IOException;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author Mark Fisher
* @author Dave Syer
*
*/
@SpringBootApplication
public class DeployedFunctionApplication {
public class FunctionApplication {
public static void main(String[] args) throws IOException {
new ApplicationBootstrap().run(FunctionApplication.class, args);
}
}

View File

@@ -0,0 +1,423 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.loader.JarLauncher;
import org.springframework.boot.loader.archive.Archive;
import org.springframework.boot.loader.archive.JarFileArchive;
import org.springframework.cloud.deployer.resource.maven.MavenProperties;
import org.springframework.cloud.deployer.resource.maven.MavenResource;
import org.springframework.cloud.deployer.resource.maven.MavenResourceLoader;
import org.springframework.cloud.deployer.resource.support.DelegatingResourceLoader;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.ResourceLoader;
import org.springframework.util.ClassUtils;
import org.springframework.util.StreamUtils;
/**
*
* Registers beans that will be picked up by spring-cloud-function-context magic. Sets up
* infrastructure capable of instantiating a "functional" bean (whether Supplier, Function
* or Consumer) loaded dynamically according to {@link FunctionProperties}.
*
* <p>
* Resolves jar location provided by the user using a flexible ResourceLoader.
* </p>
*
* @author Eric Bottard
* @author Mark Fisher
* @author Dave Syer
*/
@Configuration
@EnableConfigurationProperties
public class FunctionConfiguration {
private static Log logger = LogFactory.getLog(FunctionConfiguration.class);
@Autowired
private FunctionRegistry registry;
@Autowired
private FunctionProperties properties;
@Autowired
private DelegatingResourceLoader delegatingResourceLoader;
@Autowired
private ConfigurableApplicationContext context;
private BeanCreatorClassLoader functionClassLoader;
private BeanCreator creator;
@Bean
@ConfigurationProperties("maven")
public MavenProperties mavenProperties() {
return new MavenProperties();
}
@Bean
@ConfigurationProperties("function")
public FunctionProperties functionProperties() {
return new FunctionProperties();
}
@Bean
@ConditionalOnMissingBean(DelegatingResourceLoader.class)
public DelegatingResourceLoader delegatingResourceLoader(
MavenProperties mavenProperties) {
Map<String, ResourceLoader> loaders = new HashMap<>();
loaders.put(MavenResource.URI_SCHEME, new MavenResourceLoader(mavenProperties));
return new DelegatingResourceLoader(loaders);
}
/**
* Registers a function for each of the function classes passed into the
* {@link FunctionProperties}. They are named sequentially "function0", "function1",
* etc. The instances are created in an isolated class loader, so the jar they are
* packed in has to define all the dependencies (except core JDK).
*/
@PostConstruct
public void init() {
URL[] urls = Arrays.stream(properties.getLocation())
.flatMap(toResourceURL(delegatingResourceLoader)).toArray(URL[]::new);
try {
logger.info(
"Locating function from " + Arrays.asList(properties.getLocation()));
this.creator = new BeanCreator(expand(urls));
this.creator.run(properties.getMain());
Arrays.stream(properties.getBean()).map(this.creator::create).sequential()
.forEach(this.creator::register);
if (properties.getName().contains("|")) {
// A composite function has to be explicitly registered before it is
// looked up because we are using the SingleEntryFunctionRegistry
this.registry.lookup(Consumer.class, properties.getName());
this.registry.lookup(Function.class, properties.getName());
this.registry.lookup(Supplier.class, properties.getName());
}
}
catch (Exception e) {
throw new IllegalStateException("Cannot create functions", e);
}
}
private URL[] expand(URL[] urls) {
List<URL> result = new ArrayList<>();
for (URL url : urls) {
result.addAll(expand(url));
}
return result.toArray(new URL[0]);
}
private List<URL> expand(URL url) {
if (!"file".equals(url.getProtocol())) {
return Collections.singletonList(url);
}
if (!url.toString().endsWith(".jar")) {
return Collections.singletonList(url);
}
try {
JarFileArchive archive = new JarFileArchive(new File(url.toURI()));
return Arrays.asList(new ComputeLauncher(archive).getClassLoaderUrls());
}
catch (Exception e) {
throw new IllegalStateException("Cannot create class loader for " + url, e);
}
}
@PreDestroy
public void close() {
if (this.creator != null) {
this.creator.close();
}
if (this.functionClassLoader != null) {
try {
this.functionClassLoader.close();
this.functionClassLoader = null;
Runtime.getRuntime().gc();
}
catch (IOException e) {
throw new IllegalStateException("Cannot close function class loader", e);
}
}
}
private Function<String, Stream<URL>> toResourceURL(
DelegatingResourceLoader resourceLoader) {
return l -> {
if (l.equals("app:classpath")) {
return Stream
.of(((URLClassLoader) getClass().getClassLoader()).getURLs());
}
try {
return Stream.of(resourceLoader.getResource(l).getFile().toURI().toURL());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
};
}
private class ComputeLauncher extends JarLauncher {
public ComputeLauncher(JarFileArchive archive) {
super(archive);
}
public URL[] getClassLoaderUrls() throws Exception {
List<Archive> archives = getClassPathArchives();
if (archives.isEmpty()) {
return new URL[] { getArchive().getUrl() };
}
return archives.stream().map(archive -> {
try {
return archive.getUrl();
}
catch (MalformedURLException e) {
throw new IllegalStateException("Bad URL: " + archive, e);
}
}).collect(Collectors.toList()).toArray(new URL[0]);
}
}
/**
* Encapsulates the bean and spring application context creation concerns for
* functions. Creates a single application context if <code>run()</code> is called
* with a non-null main class, and then uses it to lookup a function (by name and then
* by type).
*/
private class BeanCreator {
private AtomicInteger counter = new AtomicInteger(0);
private ApplicationRunner runner;
public BeanCreator(URL[] urls) {
functionClassLoader = new BeanCreatorClassLoader(urls,
getClass().getClassLoader().getParent());
}
public void run(String main) {
if (main == null) {
return;
}
if (ClassUtils.isPresent(SpringApplication.class.getName(),
functionClassLoader)) {
logger.info("SpringApplication available. Bootstrapping: " + main);
ClassLoader contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(functionClassLoader);
try {
ApplicationRunner runner = new ApplicationRunner(functionClassLoader,
main);
// TODO: make the runtime properties configurable
runner.run("--spring.main.webEnvironment=false",
"--spring.cloud.stream.enabled=false",
"--spring.main.bannerMode=OFF",
"--spring.main.webApplicationType=none");
this.runner = runner;
}
finally {
ClassUtils.overrideThreadContextClassLoader(contextClassLoader);
}
}
else {
throw new IllegalStateException(
"SpringApplication not available and main class requested: "
+ main);
}
}
public Object create(String type) {
ClassLoader contextClassLoader = ClassUtils
.overrideThreadContextClassLoader(functionClassLoader);
AutowireCapableBeanFactory factory = context.getAutowireCapableBeanFactory();
try {
Object result = null;
if (this.runner != null) {
result = this.runner.getBean(type);
}
if (result == null) {
logger.info("No bean found. Instantiating: " + type);
if (ClassUtils.isPresent(type, functionClassLoader)) {
result = factory.createBean(
ClassUtils.resolveClassName(type, functionClassLoader));
}
}
if (result != null) {
logger.info("Located bean: " + type);
return result;
}
throw new IllegalStateException("Cannot create bean for: " + type);
}
finally {
ClassUtils.overrideThreadContextClassLoader(contextClassLoader);
}
}
public void register(Object bean) {
if (bean == null) {
return;
}
FunctionRegistration<Object> registration = new FunctionRegistration<Object>(
bean).names(
FunctionProperties.functionName(counter.getAndIncrement()));
if (this.runner != null) {
if (this.runner.containsBean(FunctionInspector.class.getName())) {
Object inspector = this.runner
.getBean(FunctionInspector.class.getName());
Class<?> input = (Class<?>) this.runner.evaluate(
"getInputType(#function)", inspector, "function", bean);
FunctionType type = FunctionType.from(input);
Class<?> output = findType("getOutputType", inspector, bean);
type = type.to(output);
if (((Boolean) this.runner.evaluate("isMessage(#function)", inspector,
"function", bean))) {
type = type.message();
}
Class<?> inputWrapper = findType("getInputWrapper", inspector, bean);
if (FunctionType.isWrapper(inputWrapper)) {
type = type.wrap(inputWrapper);
}
Class<?> outputWrapper = findType("getOutputWrapper", inspector,
bean);
if (FunctionType.isWrapper(outputWrapper)) {
type = type.wrap(outputWrapper);
}
registration.type(type.getType());
}
}
else {
registration.type(FunctionType.of(bean.getClass()).getType());
}
registration.target(bean);
registry.register(registration);
}
private Class<?> findType(String method, Object inspector, Object bean) {
return (Class<?>) this.runner.evaluate(method + "(#function)", inspector,
"function", bean);
}
public void close() {
if (this.runner != null) {
this.runner.close();
}
}
}
private static final class BeanCreatorClassLoader extends URLClassLoader {
private BeanCreatorClassLoader(URL[] urls, ClassLoader parent) {
super(urls, parent);
}
@Override
protected Class<?> loadClass(String name, boolean resolve)
throws ClassNotFoundException {
try {
return super.loadClass(name, resolve);
}
catch (ClassNotFoundException e) {
if (name.contains(ContextRunner.class.getName())) {
// Special case for the ContextRunner. We can re-use the bytes for it,
// and the function jar doesn't have to include them since it is only
// used here.
byte[] bytes;
try {
bytes = StreamUtils.copyToByteArray(
getClass().getClassLoader().getResourceAsStream(
ClassUtils.convertClassNameToResourcePath(name)
+ ".class"));
return defineClass(name, bytes, 0, bytes.length);
}
catch (IOException ex) {
throw new ClassNotFoundException(
"Cannot find runner class: " + name, ex);
}
}
throw e;
}
}
}
@Configuration
protected static class SingleEntryConfiguration implements BeanPostProcessor {
@Autowired
private Environment env;
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName)
throws BeansException {
String name = FunctionProperties
.functionName(env.getProperty("function.bean", ""));
if (bean instanceof FunctionRegistry && name.contains("|")) {
bean = new SingleEntryFunctionRegistry((FunctionRegistry) bean, name);
}
return bean;
}
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Dave Syer
*
*/
@Configuration
@ConditionalOnClass(FunctionExtractingFunctionCatalog.class)
@AutoConfigureBefore(ContextFunctionCatalogAutoConfiguration.class)
public class FunctionExtractingAutoConfiguration {
@Bean
public FunctionExtractingFunctionCatalog functionCatalog() {
return new FunctionExtractingFunctionCatalog();
}
}

View File

@@ -1,414 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.loader.thin.ArchiveUtils;
import org.springframework.cloud.deployer.spi.app.AppDeployer;
import org.springframework.cloud.deployer.spi.core.AppDefinition;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.deployer.thin.ThinJarAppDeployer;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.stream.config.SupplierInvokingMessageProducer;
import org.springframework.cloud.stream.binder.servlet.RouteRegistrar;
import org.springframework.context.support.LiveBeansView;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.util.MethodInvoker;
public class FunctionExtractingFunctionCatalog
implements FunctionCatalog, FunctionInspector, DisposableBean {
private static Log logger = LogFactory
.getLog(FunctionExtractingFunctionCatalog.class);
private RouteRegistrar routes;
private SupplierInvokingMessageProducer<?> producer;
private ThinJarAppDeployer deployer;
private Map<String, String> deployed = new LinkedHashMap<>();
private Map<String, String> names = new LinkedHashMap<>();
private Map<String, String> ids = new LinkedHashMap<>();
public FunctionExtractingFunctionCatalog() {
this("thin", "slim");
}
public FunctionExtractingFunctionCatalog(String name, String... profiles) {
deployer = new ThinJarAppDeployer(name, profiles);
}
@Autowired
public void setRouteRegistrar(RouteRegistrar routes) {
this.routes = routes;
}
@Autowired
public void setProducer(SupplierInvokingMessageProducer<?> producer) {
this.producer = producer;
}
@Override
public void destroy() throws Exception {
for (String name : new HashSet<>(names.keySet())) {
undeploy(name);
}
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
String name = getName(function);
if (name == null) {
return null;
}
return new FunctionRegistration<>(function).name(name)
.type(findType(function).getType());
}
private FunctionType findType(Object function) {
FunctionType type = FunctionType.from((Class<?>) type(function, "getInputType"))
.to((Class<?>) type(function, "getOutputType"))
.wrap((Class<?>) type(function, "getInputWrapper"));
if ((Boolean) type(function, "isMessage")) {
type = type.message();
}
return type;
}
@SuppressWarnings("unchecked")
@Override
public <T> T lookup(Class<?> type, String name) {
return (T) lookup(type, name, "lookup");
}
@SuppressWarnings("unchecked")
@Override
public Set<String> getNames(Class<?> type) {
return (Set<String>) getNames("getNames", type);
}
@Override
public String getName(Object function) {
Set<String> names = getNames(function);
return names.isEmpty() ? null : names.iterator().next();
}
public String deploy(String name, String path, String... args) {
Resource resource = new FileSystemResource(
ArchiveUtils.getArchiveRoot(ArchiveUtils.getArchive(path)));
AppDefinition definition = new AppDefinition(resource.getFilename(),
Collections.singletonMap(LiveBeansView.MBEAN_DOMAIN_PROPERTY_NAME,
"functions." + name));
AppDeploymentRequest request = new AppDeploymentRequest(definition, resource,
Collections.singletonMap(AppDeployer.GROUP_PROPERTY_KEY, "functions"),
Arrays.asList(args));
String id = this.deployer.deploy(request);
this.deployed.put(id, path);
this.names.put(name, id);
this.ids.put(id, name);
register(name);
return id;
}
public DeployedArtifact undeploy(String name) {
String id = this.names.get(name);
if (id == null) {
// TODO: Convert to 404
throw new IllegalStateException("No such app");
}
unregister(name);
this.deployer.undeploy(id);
String path = this.deployed.remove(id);
this.names.remove(name);
this.ids.remove(id);
return new DeployedArtifact(name, id, path);
}
private void register(String name) {
Set<String> names = getSupplierNames(name);
if (routes != null) {
logger.info("Registering routes: " + names);
routes.registerRoutes(getSupplierNames(name));
}
if (producer != null) {
// Need an ApplicationEvent that we can react to in the producer?
for (String supplier : names) {
producer.start(supplier);
}
}
}
@SuppressWarnings("unchecked")
private Set<String> getSupplierNames(String name) {
String id = this.names.get(name);
return (Set<String>) invoke(id, FunctionCatalog.class, "getNames",
Supplier.class);
}
private void unregister(String name) {
Set<String> names = getSupplierNames(name);
if (routes != null) {
logger.info("Unregistering routes: " + names);
routes.unregisterRoutes(names);
}
if (producer != null) {
for (String supplier : names) {
producer.stop(supplier);
}
}
}
private Set<String> getNames(Object arg) {
if (logger.isDebugEnabled()) {
logger.debug("Inspecting names");
}
@SuppressWarnings("unchecked")
Set<String> result = (Set<String>) invoke(FunctionInspector.class,
"getRegistration", this::extractNames, arg);
return result;
}
private Set<String> extractNames(String id, Object result) {
@SuppressWarnings("unchecked")
Set<String> prefixed = (Set<String>) prefix(id, invoke(result, "getNames"));
if (logger.isDebugEnabled()) {
logger.debug("Result (from " + this.ids.get(id) + "): " + prefixed);
}
if (prefixed.isEmpty()) {
return null;
}
return prefixed;
}
private Object type(Object arg, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Inspecting type " + method);
}
Object result = invoke(invoke(invoke(FunctionInspector.class, "getRegistration",
this::discardEmpty, arg), "getType"), method);
if (logger.isDebugEnabled()) {
logger.debug("Result: " + result);
}
return result;
}
private Object discardEmpty(String id, Object result) {
if (result == null || invoke(result, "getTarget") == null) {
return null;
}
return result;
}
private Object prefix(String id, Object result) {
String name = this.ids.get(id);
String prefix = name + "/";
if (result != null) {
if (result instanceof Collection) {
Set<String> results = new LinkedHashSet<>();
for (Object value : (Collection<?>) result) {
results.add(prefix + value);
}
return results;
}
else if (result instanceof String) {
if (logger.isDebugEnabled()) {
logger.debug("Prefixed (from \" + name + \"): " + result);
}
return prefix + result;
}
else {
return result;
}
}
return null;
}
private Object lookup(Class<?> type, String name, String method) {
if (logger.isDebugEnabled()) {
logger.debug("Looking up " + type + " named " + name + " with " + method);
}
return invoke(FunctionCatalog.class, method, type, name);
}
private Object getNames(String method, Class<?> type) {
if (logger.isDebugEnabled()) {
logger.debug("Calling " + method);
}
return invoke(FunctionCatalog.class, method, type);
}
private Object invoke(Class<?> type, String method, Object... arg) {
return invoke(type, method, null, arg);
}
private Object invoke(Class<?> type, String method, Callback<?> callback,
Object... arg) {
Set<Object> results = new LinkedHashSet<>();
Object fallback = null;
for (String id : this.deployed.keySet()) {
Object result = invoke(id, type, method, arg);
if (result instanceof Collection) {
results.addAll((Collection<?>) result);
continue;
}
if (result != null) {
if (result == Object.class) {
// Type fallback is Object
fallback = Object.class;
continue;
}
if (result instanceof Boolean && !((Boolean) result)) {
// Boolean fallback is false
fallback = false;
continue;
}
if (callback != null) {
result = callback.call(id, result);
if (result != null) {
return result;
}
continue;
}
return result;
}
}
if (fallback != null) {
return fallback;
}
if (logger.isDebugEnabled()) {
logger.debug("Results: " + results);
}
return "lookup".equals(method) ? null : results;
}
private Object invoke(String id, Class<?> type, String method, Object... arg) {
Object catalog = this.deployer.getBean(id, type);
if (catalog == null) {
return null;
}
String name = this.ids.get(id);
String prefix = name + "/";
if (arg.length == 2 && arg[0] instanceof Class) {
if (arg[1] instanceof String) {
String specific = arg[1].toString();
if (specific.startsWith(prefix)) {
arg[1] = specific.substring(prefix.length());
}
else {
return null;
}
}
}
try {
Object result = invoke(catalog, method, arg);
return prefix(id, result);
}
catch (Exception e) {
throw new IllegalStateException("Cannot extract", e);
}
}
private Object invoke(Object target, String method, Object... arg) {
MethodInvoker invoker = new MethodInvoker();
invoker.setTargetObject(target);
invoker.setTargetMethod(method);
invoker.setArguments(arg);
try {
invoker.prepare();
return invoker.invoke();
}
catch (Exception e) {
throw new IllegalStateException("Cannot invoke method", e);
}
}
public Map<String, Object> deployed() {
Map<String, Object> result = new LinkedHashMap<>();
for (String name : this.names.keySet()) {
String id = this.names.get(name);
result.put(name, new DeployedArtifact(name, id, this.deployed.get(id)));
}
return result;
}
interface Callback<T> {
T call(String id, Object result);
}
}
class DeployedArtifact {
private String name;
private String id;
private String path;
public DeployedArtifact() {
}
public DeployedArtifact(String name, String id, String path) {
this.name = name;
this.id = id;
this.path = path;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.PostConstruct;
import org.springframework.util.StringUtils;
/**
* Configuration properties for deciding how to locate the functional class to execute.
*
* @author Eric Bottard
*/
public class FunctionProperties {
/**
* Location(s) of jar archives containing the supplier/function/consumer class to run.
*/
private String[] location = new String[0];
/**
* The bean name or fully qualified class name of the supplier/function/consumer to
* run.
*/
private String[] bean = new String[0];
/**
* Optional main class from which to build a Spring application context
*/
private String main;
public String getName() {
return functionName(StringUtils.arrayToDelimitedString(bean, ","));
}
public String[] getBean() {
return bean;
}
public void setBean(String[] bean) {
this.bean = bean;
}
public String[] getLocation() {
return location;
}
public void setLocation(String[] location) {
this.location = location;
}
public String getMain() {
return main;
}
public void setMain(String main) {
this.main = main;
}
public static String functionName(String name) {
if (!name.contains(",")) {
return "function0";
}
List<String> names = new ArrayList<>();
for (int i = 0; i <= StringUtils.countOccurrencesOf(name, ","); i++) {
names.add("function" + i);
}
return StringUtils.collectionToDelimitedString(names, "|");
}
public static String functionName(int value) {
return "function" + value;
}
@PostConstruct
public void init() {
if (location.length == 0) {
throw new IllegalStateException(
"No archive location provided, please configure function.location as a jar or directory.");
}
if (bean.length == 0) {
throw new IllegalStateException(
"No function bean locator provided, please configure function.bean as a bean name or class name.");
}
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.Collections;
import java.util.Set;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
/**
* @author Dave Syer
*
*/
public class SingleEntryFunctionRegistry implements FunctionRegistry {
private final FunctionRegistry delegate;
private final String name;
public SingleEntryFunctionRegistry(FunctionRegistry delegate, String name) {
this.delegate = delegate;
this.name = name;
}
@Override
public <T> T lookup(Class<?> type, String name) {
return this.name.equals(name) ? this.delegate.lookup(type, name) : null;
}
@Override
public Set<String> getNames(Class<?> type) {
Set<String> names = this.delegate.getNames(type);
return names.contains(this.name) ? Collections.singleton(this.name)
: Collections.emptySet();
}
@Override
public <T> void register(FunctionRegistration<T> registration) {
this.delegate.register(registration);
}
}

View File

@@ -1,2 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.deployer.FunctionExtractingAutoConfiguration
org.springframework.cloud.function.deployer.FunctionConfiguration

View File

@@ -1,15 +0,0 @@
exclusions.spring-web-reactive: org.springframework:spring-web-reactive
exclusions.reator-netty: io.projectreactor.ipc:reactor-netty
exclusions.spring-cloud-stream: org.springframework.cloud:spring-cloud-stream
exclusions.spring-cloud-stream-reactive: org.springframework.cloud:spring-cloud-stream-reactive
exclusions.spring-cloud-stream-binder-servlet: org.springframework.cloud:spring-cloud-stream-binder-servlet
exclusions.spring-cloud-stream-binder-rabbit: org.springframework.cloud:spring-cloud-stream-binder-rabbit
exclusions.spring-cloud-stream-binder-kafka: org.springframework.cloud:spring-cloud-stream-binder-kafka
exclusions.spring-boot-starter-web: org.springframework.boot:spring-boot-starter-web
exclusions.spring-boot-starter-stream: org.springframework.boot:spring-boot-starter-stream
exclusions.spring-boot-starter-actuator: org.springframework.boot:spring-boot-starter-actuator
dependencies.spring-web: org.springframework:spring-web
dependencies.jackson-databind: com.fasterxml.jackson.core:jackson-databind
dependencies.spring-boot-starter: org.springframework.boot:spring-boot-starter
dependencies.spring-cloud-function-context: org.springframework.cloud:spring-cloud-function-context:1.0.0.BUILD-SNAPSHOT

View File

@@ -1,49 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-function-deployer</artifactId>
<packaging>jar</packaging>
<name>spring-cloud-function-deployer</name>
<description>Spring Cloud Function Web Support</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.11.RELEASE</version>
</parent>
<properties>
<reactor.version>3.1.4.RELEASE</reactor.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@@ -1,129 +0,0 @@
/*
* Copyright 2012-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.boot.loader.thin.ArchiveUtils;
import org.springframework.boot.loader.tools.LogbackInitializer;
import org.springframework.cloud.deployer.spi.app.DeploymentState;
import org.springframework.cloud.deployer.spi.core.AppDefinition;
import org.springframework.cloud.deployer.spi.core.AppDeploymentRequest;
import org.springframework.cloud.deployer.thin.ThinJarAppDeployer;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.util.StringUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
* @author Mark Fisher
*/
@RunWith(Parameterized.class)
public class FunctionAppDeployerTests {
static {
LogbackInitializer.initialize();
}
private static ThinJarAppDeployer deployer = new ThinJarAppDeployer();
@BeforeClass
public static void skip() {
try {
ArchiveUtils.getArchiveRoot(ArchiveUtils
.getArchive("maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT"));
}
catch (Exception e) {
Assume.assumeNoException(
"Could not locate jar for tests. Please build spring-cloud-function locally first.",
e);
}
}
@Parameterized.Parameters
public static List<Object[]> data() {
// Repeat a couple of times to ensure it's consistent
return Arrays.asList(new Object[2][0]);
}
@Test
public void directory() throws Exception {
String first = deploy("file:../spring-cloud-function-samples/function-sample/target/classes", "",
"--spring.cloud.function.stream.supplier.enabled=false");
// Deployment is blocking so it either failed or succeeded.
assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed);
deployer.undeploy(first);
}
@Test
public void web() throws Exception {
String first = deploy("maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT", "",
"--spring.cloud.function.stream.supplier.enabled=false");
// Deployment is blocking so it either failed or succeeded.
assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed);
deployer.undeploy(first);
}
@Test
public void stream() throws Exception {
String first = deploy("maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT",
"spring.cloud.deployer.thin.profile=rabbit",
"--spring.cloud.function.stream.supplier.enabled=false", "--debug=true");
// Deployment is blocking so it either failed or succeeded.
assertThat(deployer.status(first).getState()).isEqualTo(DeploymentState.deployed);
deployer.undeploy(first);
}
private String deploy(String jarName, String properties, String... args)
throws Exception {
Resource resource = new FileSystemResource(
ArchiveUtils.getArchiveRoot(ArchiveUtils.getArchive(jarName)));
AppDefinition definition = new AppDefinition(resource.getFilename(),
Collections.emptyMap());
AppDeploymentRequest request = new AppDeploymentRequest(definition, resource,
properties(properties), Arrays.asList(args));
String deployed = deployer.deploy(request);
return deployed;
}
private Map<String, String> properties(String properties) {
Map<String, String> map = new LinkedHashMap<>();
Properties props = StringUtils.splitArrayElementsIntoProperties(
StringUtils.commaDelimitedListToStringArray(properties), "=");
if (props != null) {
for (Object name : props.keySet()) {
String key = (String) name;
map.put(key, props.getProperty(key));
}
}
return map;
}
}

View File

@@ -0,0 +1,120 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FunctionConfiguration.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@TestPropertySource(properties = {
"function.location=file:target/it/support/target/function-sample-1.0.0.M1.jar", })
public abstract class FunctionConfigurationTests {
@Autowired
protected MessageCollector messageCollector;
@EnableAutoConfiguration
@TestPropertySource(properties = { "function.bean=com.example.functions.Emitter" })
public static class SourceTests extends FunctionConfigurationTests {
@Autowired
private Source source;
@Test
public void test() throws Exception {
Message<?> received = messageCollector.forChannel(source.output()).poll(2,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is("one"));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = {
"function.bean=com.example.functions.Emitter,com.example.functions.LengthCounter" })
public static class CompositeTests extends FunctionConfigurationTests {
@Autowired
private Source source;
@Test
public void test() throws Exception {
Message<?> received = messageCollector.forChannel(source.output()).poll(2,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is(3));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = {
"function.bean=com.example.functions.LengthCounter" })
public static class ProcessorTests extends FunctionConfigurationTests {
@Autowired
private Processor processor;
@Test
public void test() throws Exception {
processor.input().send(MessageBuilder.withPayload("hello").build());
Message<?> received = messageCollector.forChannel(processor.output()).poll(1,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is("hello".length()));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = {
"function.bean=com.example.functions.DoubleLogger" })
public static class SinkTests extends FunctionConfigurationTests {
@Autowired
private Sink sink;
@Test
public void test() throws Exception {
// Can't assert side effects.
sink.input().send(MessageBuilder.withPayload(5).build());
}
}
}

View File

@@ -1,130 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.net.URI;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
public class FunctionExtractingFunctionCatalogIntegrationTests {
private static ConfigurableApplicationContext context;
private static int port;
@BeforeClass
public static void open() throws Exception {
port = SocketUtils.findAvailableTcpPort();
// System.setProperty("debug", "true");
context = new ApplicationRunner().start("--server.port=" + port, "--debug",
"--logging.level.org.springframework.cloud.function=DEBUG");
deploy("sample", "maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT");
}
private static void deploy(String name, String path) throws Exception {
ResponseEntity<String> result = new TestRestTemplate().postForEntity(
"http://localhost:" + port + "/admin/" + name + "?path=" + path, "",
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
}
private static String undeploy(String name) throws Exception {
ResponseEntity<String> result = new TestRestTemplate().exchange(RequestEntity
.delete(new URI("http://localhost:" + port + "/admin/" + name)).build(),
String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.OK);
return result.getBody();
}
@AfterClass
public static void close() {
if (context != null) {
context.close();
}
}
@Test
public void listing() {
assertThat(new TestRestTemplate()
.getForObject("http://localhost:" + port + "/admin", String.class))
.startsWith("{").contains("sample");
}
@Test
public void words() {
assertThat(new TestRestTemplate().getForObject(
"http://localhost:" + port + "/stream/sample/words", String.class))
.isEqualTo("[\"foo\",\"bar\"]");
}
@Test
public void missing() throws Exception {
ResponseEntity<String> result = new TestRestTemplate().exchange(RequestEntity
.get(new URI("http://localhost:" + port + "/stream/missing/words"))
.build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
public void uppercase() throws Exception {
ResponseEntity<String> result = new TestRestTemplate().exchange(RequestEntity
.post(new URI("http://localhost:" + port + "/stream/sample/uppercase"))
.contentType(MediaType.TEXT_PLAIN)
.body("foo"), String.class);
assertThat(result.getBody()).isEqualTo("FOO");
}
@Test
public void another() throws Exception {
deploy("pof",
"maven://io.spring.sample:function-sample-pof:jar:exec:1.0.0.BUILD-SNAPSHOT");
assertThat(new TestRestTemplate().postForObject(
"http://localhost:" + port + "/stream/pof/greeter", "Foo",
String.class)).isEqualTo("Hello Foo");
}
@Test
public void cycle() throws Exception {
String undeploy = undeploy("sample");
assertThat(undeploy.contains("\"name\":\"sample\""));
assertThat(undeploy.contains(
"\"path\":\"maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT\""));
ResponseEntity<String> result = new TestRestTemplate().exchange(RequestEntity
.get(new URI("http://localhost:" + port + "/stream/sample/words"))
.build(), String.class);
assertThat(result.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
deploy("sample", "maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT");
assertThat(new TestRestTemplate().postForObject(
"http://localhost:" + port + "/stream/sample/uppercase", "foo",
String.class)).isEqualTo("FOO");
}
}

View File

@@ -1,129 +0,0 @@
/*
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.boot.loader.tools.LogbackInitializer;
import static org.assertj.core.api.Assertions.assertThat;
import reactor.core.publisher.Flux;
/**
* @author Dave Syer
*
*/
public class FunctionExtractingFunctionCatalogTests {
private static String id;
static {
LogbackInitializer.initialize();
}
private static FunctionExtractingFunctionCatalog deployer = new FunctionExtractingFunctionCatalog();
@Rule
public ExpectedException expected = ExpectedException.none();
@Before
public void init() throws Exception {
if (id == null) {
deploy("sample",
"maven://io.spring.sample:function-sample:1.0.0.BUILD-SNAPSHOT");
// "--debug");
id = deploy("pojos",
"maven://io.spring.sample:function-sample-pojo:1.0.0.BUILD-SNAPSHOT");
}
}
@AfterClass
public static void close() {
if (id != null) {
deployer.undeploy("sample");
deployer.undeploy("pojos");
}
}
@Test
public void listFunctions() throws Exception {
assertThat(deployer.getNames(Function.class)).contains("sample/uppercase",
"pojos/uppercase");
}
@Test
public void nameFunction() throws Exception {
assertThat(deployer.getName(deployer.lookup(Function.class, "sample/uppercase")))
.isEqualTo("sample/uppercase");
}
@Test
public void deployAndExtractFunctions() throws Exception {
// This one can only work if you change the boot classpath to contain reactor-core
// and reactive-streams
expected.expect(ClassCastException.class);
Function<Flux<String>, Flux<String>> function = deployer.lookup(Function.class,
"pojos/uppercase");
Flux<String> result = function.apply(Flux.just("foo"));
assertThat(result.blockFirst()).isEqualTo("FOO");
}
@Test
public void listConsumers() throws Exception {
assertThat(deployer.getNames(Consumer.class)).isEmpty();
}
@Test
public void deployAndExtractConsumers() throws Exception {
assertThat(deployer.<Consumer<?>>lookup(Consumer.class, "pojos/sink")).isNull();
}
@Test
public void listSuppliers() throws Exception {
assertThat(deployer.getNames(Supplier.class)).contains("sample/words",
"pojos/words");
}
@Test
public void nameSupplier() throws Exception {
assertThat(deployer.getName(deployer.lookup(Supplier.class, "sample/words")))
.isEqualTo("sample/words");
}
@Test
public void deployAndExtractSuppliers() throws Exception {
assertThat(deployer.<Supplier<?>>lookup(Supplier.class, "sample/words"))
.isNotNull();
assertThat(deployer.<Supplier<?>>lookup(Supplier.class, "pojos/words"))
.isNotNull();
}
private static String deploy(String name, String path, String... args)
throws Exception {
String deployed = deployer.deploy(name, path, args);
return deployed;
}
}

View File

@@ -0,0 +1,121 @@
/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.deployer;
import java.util.concurrent.TimeUnit;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
import static org.junit.Assert.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = FunctionConfiguration.class, webEnvironment = SpringBootTest.WebEnvironment.NONE)
@TestPropertySource(properties = {
"function.location=file:target/it/support/target/function-sample-1.0.0.M1-exec.jar", })
public abstract class SpringFunctionAppConfigurationTests {
@Autowired
protected MessageCollector messageCollector;
@EnableAutoConfiguration
@TestPropertySource(properties = { "function.bean=myEmitter",
"function.main=com.example.functions.FunctionApp" })
public static class SourceTests extends SpringFunctionAppConfigurationTests {
@Autowired
private Source source;
@Test
public void test() throws Exception {
Message<?> received = messageCollector.forChannel(source.output()).poll(2,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is("one"));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = { "function.bean=myEmitter,myCounter",
"function.main=com.example.functions.FunctionApp" })
public static class CompositeTests extends SpringFunctionAppConfigurationTests {
@Autowired
private Source source;
@Test
public void test() throws Exception {
Message<?> received = messageCollector.forChannel(source.output()).poll(2,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is(3));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = { "function.bean=myCounter",
"function.main=com.example.functions.FunctionApp" })
public static class ProcessorTests extends SpringFunctionAppConfigurationTests {
@Autowired
private Processor processor;
@Test
public void test() throws Exception {
processor.input().send(MessageBuilder.withPayload("hello").build());
Message<?> received = messageCollector.forChannel(processor.output()).poll(1,
TimeUnit.SECONDS);
assertThat(received.getPayload(), Matchers.is("hello".length()));
}
}
@EnableAutoConfiguration
@TestPropertySource(properties = { "function.bean=myDoubler",
"function.main=com.example.functions.FunctionApp" })
public static class SinkTests extends SpringFunctionAppConfigurationTests {
@Autowired
private Sink sink;
@Test
public void test() throws Exception {
// Can't assert side effects.
sink.input().send(MessageBuilder.withPayload(5).build());
}
}
}