diff --git a/README.adoc b/README.adoc index 579d70b3c..73425850e 100644 --- a/README.adoc +++ b/README.adoc @@ -1,7 +1,7 @@ == Register a Function: ---- -./register.sh -n uppercase -f "f->f.map(s->s.toString().toUpperCase())" +./registerFunction.sh -n uppercase -f "f->f.map(s->s.toString().toUpperCase())" ---- == Run a Stream Processing Microservice using that Function: @@ -21,9 +21,21 @@ (assuming the `uppercase` function was already registered as above) ---- -./register.sh -n pluralize -f "f->f.map(s->s+\"S\")" +./registerFunction.sh -n pluralize -f "f->f.map(s->s+\"S\")" ./web.sh -p /words -f uppercase,pluralize ---- +== Run a Task Microservice using a Supplier, Function, and Consumer: + +(assuming the `uppercase` function was already registered as above) + +---- +./registerSupplier.sh -n words -f "()->Flux.just(\"foo\",\"bar\")" + +./registerConsumer.sh -n print -f "System.out::println" + +./task.sh -s words -f uppercase -c print +---- + (more docs soon) diff --git a/pom.xml b/pom.xml index 68f68f908..b014487a7 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ spring-cloud-function-compiler spring-cloud-function-core spring-cloud-function-stream + spring-cloud-function-task spring-cloud-function-web diff --git a/scripts/register.sh b/scripts/registerConsumer.sh similarity index 80% rename from scripts/register.sh rename to scripts/registerConsumer.sh index a365fc9dc..8b46d514a 100755 --- a/scripts/register.sh +++ b/scripts/registerConsumer.sh @@ -11,6 +11,6 @@ while getopts ":n:f:" opt; do esac done -java -jar ../spring-cloud-function-core/target/spring-cloud-function-core-1.0.0.BUILD-SNAPSHOT-registrar.jar\ +java -jar ../spring-cloud-function-core/target/spring-cloud-function-core-1.0.0.BUILD-SNAPSHOT-registrar.jar consumer\ $NAME\ $FUNC diff --git a/scripts/registerFunction.sh b/scripts/registerFunction.sh new file mode 100755 index 000000000..aa95fb5d3 --- /dev/null +++ b/scripts/registerFunction.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +while getopts ":n:f:" opt; do + case $opt in + n) + NAME=$OPTARG + ;; + f) + FUNC=$OPTARG + ;; + esac +done + +java -jar ../spring-cloud-function-core/target/spring-cloud-function-core-1.0.0.BUILD-SNAPSHOT-registrar.jar function\ + $NAME\ + $FUNC diff --git a/scripts/registerSupplier.sh b/scripts/registerSupplier.sh new file mode 100755 index 000000000..242c47d97 --- /dev/null +++ b/scripts/registerSupplier.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +while getopts ":n:f:" opt; do + case $opt in + n) + NAME=$OPTARG + ;; + f) + FUNC=$OPTARG + ;; + esac +done + +java -jar ../spring-cloud-function-core/target/spring-cloud-function-core-1.0.0.BUILD-SNAPSHOT-registrar.jar supplier\ + $NAME\ + $FUNC diff --git a/scripts/task.sh b/scripts/task.sh new file mode 100755 index 000000000..8fbcb5591 --- /dev/null +++ b/scripts/task.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +while getopts ":s:f:c:" opt; do + case $opt in + s) + SUPP=$OPTARG + ;; + f) + FUNC=$OPTARG + ;; + c) + CONS=$OPTARG + ;; + esac +done + +java -jar ../spring-cloud-function-task/target/spring-cloud-function-task-1.0.0.BUILD-SNAPSHOT.jar\ + --lambda.supplier=$SUPP --lambda.function=$FUNC --lambda.consumer=$CONS diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java new file mode 100644 index 000000000..7471306e0 --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java @@ -0,0 +1,143 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.List; +import java.util.regex.Matcher; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.springframework.cloud.function.compiler.java.CompilationFailedException; +import org.springframework.cloud.function.compiler.java.CompilationMessage; +import org.springframework.cloud.function.compiler.java.CompilationResult; +import org.springframework.cloud.function.compiler.java.RuntimeJavaCompiler; + +/** + * @author Andy Clement + * @author Mark Fisher + */ +abstract class AbstractFunctionCompiler { + + private static Logger logger = LoggerFactory.getLogger(AbstractFunctionCompiler.class); + + // Newlines in the property are escaped + private static final String NEWLINE_ESCAPE = Matcher.quoteReplacement("\\n"); + + // Individual double-quote characters are represented by two double quotes in the DSL + private static final String DOUBLE_DOUBLE_QUOTE = Matcher.quoteReplacement("\"\""); + + /** + * The user supplied code snippet is inserted into the template and then the result is compiled + */ + private static String SOURCE_CODE_TEMPLATE = + "package " + AbstractFunctionCompiler.class.getPackage().getName() + ";\n" + + "import java.util.*;\n" + // Helpful to include this + "import java.util.function.*;\n" + + "import reactor.core.publisher.Flux;\n" + + "public class %s implements %sFactory {\n" + + " public %s<%s> getResult() {\n" + + " %s\n" + + " }\n" + + "}\n"; + + static enum ResultType { Consumer, Function, Supplier } + + private final ResultType resultType; + + private final String parameterizedTypes; + + private final RuntimeJavaCompiler compiler = new RuntimeJavaCompiler(); + + AbstractFunctionCompiler(ResultType type, String parameterizedTypes) { + this.resultType = type; + this.parameterizedTypes = parameterizedTypes; + } + + /** + * Produce a factory instance by: + * + * @return a factory instance + */ + public CompiledFunctionFactory compile(String name, String code) { + if (name == null || name.length() == 0) { + throw new IllegalArgumentException("name must not be empty"); + } + logger.info("Initial code property value :'{}'", code); + code = decode(code); + if (code.startsWith("\"") && code.endsWith("\"")) { + code = code.substring(1,code.length()-1); + } + if (!code.startsWith("return ") && !code.endsWith(";")) { + code = String.format("return (%s<%s> & java.io.Serializable) %s;", resultType, this.parameterizedTypes, code); + } + logger.info("Processed code property value :\n{}\n", code); + String firstLetter = name.substring(0, 1).toUpperCase(); + name = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter; + String className = String.format("%s.%s%sFactory", this.getClass().getPackage().getName(), name, resultType); + CompilationResult compilationResult = buildAndCompileSourceCode(className, code); + if (compilationResult.wasSuccessful()) { + return new CompiledFunctionFactory(className, compilationResult); + } + List compilationMessages = compilationResult.getCompilationMessages(); + throw new CompilationFailedException(compilationMessages); + } + + /** + * Create the source for and then compile and load a class that embodies + * the supplied methodBody. The methodBody is inserted into a class template that + * returns a Function<Flux<Object>,Flux<Object>>. + * This method can return more than one class if the method body includes local class + * declarations. An example methodBody would be return input -> input.buffer(5).map(list->list.get(0));. + * + * @param className the name of the class + * @param methodBody the source code for a method that should return a + * Function<Flux<Object>,Flux<Object>> + * @return the list of Classes produced by compiling and then loading the snippet of code + */ + private CompilationResult buildAndCompileSourceCode(String className, String methodBody) { + String sourceCode = makeSourceClassDefinition(className, methodBody); + return compiler.compile(className, sourceCode); + } + + private static String decode(String input) { + return input.replaceAll(NEWLINE_ESCAPE, "\n").replaceAll(DOUBLE_DOUBLE_QUOTE, "\""); + } + + /** + * Make a full source code definition for a class by applying the specified method body + * to the Reactive template. + * + * @param className the name of the class + * @param methodBody the code to insert into the Reactive source class template + * @return a complete Java Class definition + */ + private String makeSourceClassDefinition(String className, String methodBody) { + String shortClassName = className.substring(className.lastIndexOf('.') + 1); + String s = String.format(SOURCE_CODE_TEMPLATE, shortClassName, resultType, resultType, this.parameterizedTypes, methodBody); + System.out.println(s); + return s; + } + +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompilationResultFactory.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompilationResultFactory.java new file mode 100644 index 000000000..af3946a91 --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompilationResultFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +/** + * @author Mark Fisher + */ +public interface CompilationResultFactory { + + T getResult(); + +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompiledFunctionFactory.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompiledFunctionFactory.java index 26938862e..22de0bd39 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompiledFunctionFactory.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/CompiledFunctionFactory.java @@ -17,43 +17,42 @@ package org.springframework.cloud.function.compiler; import java.util.List; -import java.util.function.Function; import org.springframework.cloud.function.compiler.java.CompilationResult; /** * @author Mark Fisher */ -public class CompiledFunctionFactory implements FunctionFactory { +public class CompiledFunctionFactory implements CompilationResultFactory { - private final Function function; + private final T result; private final byte[] generatedClassBytes; public CompiledFunctionFactory(String className, CompilationResult compilationResult) { List> clazzes = compilationResult.getCompiledClasses(); - Function function = null; + T result = null; for (Class clazz: clazzes) { if (clazz.getName().equals(className)) { try { @SuppressWarnings("unchecked") - FunctionFactory functionFactory = (FunctionFactory) clazz.newInstance(); - function = functionFactory.getFunction(); + CompilationResultFactory factory = (CompilationResultFactory) clazz.newInstance(); + result = factory.getResult(); } catch (Exception e) { throw new IllegalArgumentException("Unexpected problem during retrieval of Function from compiled class", e); } } } - if (function == null) { - throw new IllegalArgumentException("Failed to extract Function from compilation result."); + if (result == null) { + throw new IllegalArgumentException("Failed to extract compilation result."); } - this.function = function; + this.result = result; this.generatedClassBytes = compilationResult.getClassBytes(className); } - public Function getFunction() { - return function; + public T getResult() { + return result; } public byte[] getGeneratedClassBytes() { diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerCompiler.java new file mode 100644 index 000000000..0b79ee495 --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerCompiler.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.function.Consumer; + +/** + * @author Mark Fisher + */ +public class ConsumerCompiler extends AbstractFunctionCompiler> { + + public ConsumerCompiler() { + super(ResultType.Consumer, "Object"); + } +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerFactory.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerFactory.java new file mode 100644 index 000000000..babe13f9a --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/ConsumerFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.function.Consumer; + +/** + * @author Mark Fisher + */ +public interface ConsumerFactory extends CompilationResultFactory> { + +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/Example.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/Example.java new file mode 100644 index 000000000..2fd1bd0e4 --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/Example.java @@ -0,0 +1,43 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +public class Example { + + public static void main(String[] args) { + SupplierCompiler> supplierCompiler = new SupplierCompiler<>(); + CompiledFunctionFactory>> supplierFactory = supplierCompiler.compile("s", "return ()->Flux.just(\"foo\");"); + Flux input = supplierFactory.getResult().get(); + + FunctionCompiler, Flux> functionCompiler = new FunctionCompiler<>(); + CompiledFunctionFactory,Flux>> functionFactory = functionCompiler.compile("f", "f->f.map(s->s.toString().toUpperCase())"); + Flux output = functionFactory.getResult().apply(input); + + ConsumerCompiler> consumerCompiler = new ConsumerCompiler<>(); + CompiledFunctionFactory>> consumerFactory = consumerCompiler.compile("c", "f->f.subscribe(System.out::println)"); + consumerFactory.getResult().accept(output); + } +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java index 19e81f148..a896e90b2 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java @@ -1,12 +1,12 @@ /* * Copyright 2016 the original author or authors. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,115 +16,14 @@ package org.springframework.cloud.function.compiler; -import java.util.List; -import java.util.regex.Matcher; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.cloud.function.compiler.java.CompilationFailedException; -import org.springframework.cloud.function.compiler.java.CompilationMessage; -import org.springframework.cloud.function.compiler.java.CompilationResult; -import org.springframework.cloud.function.compiler.java.RuntimeJavaCompiler; +import java.util.function.Function; /** - * @author Andy Clement * @author Mark Fisher */ -public class FunctionCompiler { +public class FunctionCompiler extends AbstractFunctionCompiler> { - private static Logger logger = LoggerFactory.getLogger(FunctionCompiler.class); - - // Newlines in the property are escaped - private static final String NEWLINE_ESCAPE = Matcher.quoteReplacement("\\n"); - - // Individual double-quote characters are represented by two double quotes in the DSL - private static final String DOUBLE_DOUBLE_QUOTE = Matcher.quoteReplacement("\"\""); - - /** - * The user supplied code snippet is inserted into the template and then the result is compiled - */ - private static String SOURCE_CODE_TEMPLATE = - "package " + FunctionCompiler.class.getPackage().getName() + ";\n" + - "import java.util.*;\n" + // Helpful to include this - "import java.util.function.*;\n" + - "import reactor.core.publisher.Flux;\n" + - "public class %s implements FunctionFactory {\n" + - " public Function, Flux> getFunction() {\n" + - " %s\n" + - " }\n" + - "}\n"; - - private final RuntimeJavaCompiler compiler = new RuntimeJavaCompiler(); - - /** - * Produce a CompiledFunctionFactory instance by:
    - *
  • Decoding the code String to process any newlines/double-double-quotes - *
  • Insert the code into the source code template for a class - *
  • Compiling the class using the JDK provided Java Compiler - *
  • Loading the compiled class - *
  • Invoking a well known method on the factory class to produce a Function instance - *
  • Returning that instance. - *
- * - * @return a CompiledFunctionFactory instance - */ - public CompiledFunctionFactory compile(String name, String code) { - if (name == null || name.length() == 0) { - throw new IllegalArgumentException("name must not be empty"); - } - logger.info("Initial code property value :'{}'", code); - code = decode(code); - if (code.startsWith("\"") && code.endsWith("\"")) { - code = code.substring(1,code.length()-1); - } - if (!code.startsWith("return ") && !code.endsWith(";")) { - code = "return (Function,Flux> & java.io.Serializable) " + code + ";"; - } - logger.info("Processed code property value :\n{}\n", code); - String firstLetter = name.substring(0, 1).toUpperCase(); - name = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter; - String className = String.format("%s.%sFunctionFactory", this.getClass().getPackage().getName(), name); - CompilationResult compilationResult = buildAndCompileSourceCode(className, code); - if (compilationResult.wasSuccessful()) { - return new CompiledFunctionFactory<>(className, compilationResult); - } - List compilationMessages = compilationResult.getCompilationMessages(); - throw new CompilationFailedException(compilationMessages); + public FunctionCompiler() { + super(ResultType.Function, "Flux, Flux"); } - - /** - * Create the source for and then compile and load a class that embodies - * the supplied methodBody. The methodBody is inserted into a class template that - * returns a Function<Flux<Object>,Flux<Object>>. - * This method can return more than one class if the method body includes local class - * declarations. An example methodBody would be return input -> input.buffer(5).map(list->list.get(0));. - * - * @param className the name of the class - * @param methodBody the source code for a method that should return a - * Function<Flux<Object>,Flux<Object>> - * @return the list of Classes produced by compiling and then loading the snippet of code - */ - private CompilationResult buildAndCompileSourceCode(String className, String methodBody) { - String sourceCode = makeSourceClassDefinition(className, methodBody); - return compiler.compile(className, sourceCode); - } - - private static String decode(String input) { - return input.replaceAll(NEWLINE_ESCAPE, "\n").replaceAll(DOUBLE_DOUBLE_QUOTE, "\""); - } - - /** - * Make a full source code definition for a class by applying the specified method body - * to the Reactive template. - * - * @param className the name of the class - * @param methodBody the code to insert into the Reactive source class template - * @return a complete Java Class definition - */ - private static String makeSourceClassDefinition(String className, String methodBody) { - String shortClassName = className.substring(className.lastIndexOf('.') + 1); - return String.format(SOURCE_CODE_TEMPLATE, shortClassName, methodBody); - } - } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionFactory.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionFactory.java index 06ed6aeed..f5f7c864b 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionFactory.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionFactory.java @@ -21,8 +21,6 @@ import java.util.function.Function; /** * @author Mark Fisher */ -public interface FunctionFactory { - - Function getFunction(); - +public interface FunctionFactory extends CompilationResultFactory> { + } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierCompiler.java new file mode 100644 index 000000000..23a57c467 --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierCompiler.java @@ -0,0 +1,29 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.function.Supplier; + +/** + * @author Mark Fisher + */ +public class SupplierCompiler extends AbstractFunctionCompiler> { + + public SupplierCompiler() { + super(ResultType.Supplier, "Flux"); + } +} diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierFactory.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierFactory.java new file mode 100644 index 000000000..6cabffb6b --- /dev/null +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/SupplierFactory.java @@ -0,0 +1,26 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.compiler; + +import java.util.function.Supplier; + +/** + * @author Mark Fisher + */ +public interface SupplierFactory extends CompilationResultFactory> { + +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/FunctionRegistrar.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/FunctionRegistrar.java index ab85dcf87..4a6c29a88 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/FunctionRegistrar.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/FunctionRegistrar.java @@ -24,11 +24,25 @@ import org.springframework.cloud.function.registry.FileSystemFunctionRegistry; public class FunctionRegistrar { public static void main(String[] args) { - if (args.length != 2) { - System.err.println("USAGE: java FunctionRegistrar functionName functionBody"); + String usage = "USAGE: java FunctionRegistrar [supplier|function|consumer] name lambda"; + if (args.length != 3) { + System.err.println(usage); System.exit(1); } FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(); - registry.register(args[0], args[1]); + String type = args[0]; + if ("supplier".equalsIgnoreCase(type)) { + registry.registerSupplier(args[1], args[2]); + } + else if ("function".equalsIgnoreCase(type)) { + registry.registerFunction(args[1], args[2]); + } + else if ("consumer".equalsIgnoreCase(type)) { + registry.registerConsumer(args[1], args[2]); + } + else { + System.err.println(usage); + System.exit(1); + } } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java index 71cfee901..8275777d6 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/gateway/LocalFunctionGateway.java @@ -48,19 +48,19 @@ public class LocalFunctionGateway implements FunctionGateway { @Override public R invoke(String functionName, T request) { - Function function = this.registry.lookup(functionName); + Function function = this.registry.lookupFunction(functionName); return function.apply(request); } @Override public void schedule(String functionName, Trigger trigger, Supplier supplier, Consumer consumer) { - Function function = this.registry.lookup(functionName); + Function function = this.registry.lookupFunction(functionName); this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger); } @Override public void subscribe(Publisher publisher, String functionName, final Consumer consumer) { - final Function function = this.registry.lookup(functionName); + final Function function = this.registry.lookupFunction(functionName); publisher.subscribe(new Subscriber() { @Override diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/AbstractFunctionRegistry.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/AbstractFunctionRegistry.java index 43d4e674f..f5f023f7b 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/AbstractFunctionRegistry.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/AbstractFunctionRegistry.java @@ -18,11 +18,17 @@ package org.springframework.cloud.function.registry; import java.util.HashMap; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.springframework.cloud.function.compiler.CompiledFunctionFactory; +import org.springframework.cloud.function.compiler.ConsumerCompiler; +import org.springframework.cloud.function.compiler.ConsumerFactory; import org.springframework.cloud.function.compiler.FunctionCompiler; import org.springframework.cloud.function.compiler.FunctionFactory; +import org.springframework.cloud.function.compiler.SupplierCompiler; +import org.springframework.cloud.function.compiler.SupplierFactory; import org.springframework.cloud.function.compiler.java.SimpleClassLoader; import org.springframework.util.Assert; @@ -31,53 +37,125 @@ import org.springframework.util.Assert; */ public abstract class AbstractFunctionRegistry implements FunctionRegistry { - private static final Map> FACTORY_CACHE = new HashMap<>(); + private static final Map> CONSUMER_FACTORY_CACHE = new HashMap<>(); - private final FunctionCompiler compiler = new FunctionCompiler(); + private static final Map> FUNCTION_FACTORY_CACHE = new HashMap<>(); + + private static final Map> SUPPLIER_FACTORY_CACHE = new HashMap<>(); + + private final ConsumerCompiler consumerCompiler = new ConsumerCompiler<>(); + + private final FunctionCompiler functionCompiler = new FunctionCompiler<>(); + + private final SupplierCompiler supplierCompiler = new SupplierCompiler<>(); private final SimpleClassLoader classLoader = new SimpleClassLoader(AbstractFunctionRegistry.class.getClassLoader()); @Override @SuppressWarnings("unchecked") - public Function compose(String... functionNames) { + public Function composeFunction(String... functionNames) { @SuppressWarnings("rawtypes") - Function function = this.lookup(functionNames[0]); + Function function = this.lookupFunction(functionNames[0]); for (int i = 1; i < functionNames.length; i++) { - function = function.andThen(this.lookup(functionNames[i])); + function = function.andThen(this.lookupFunction(functionNames[i])); } return function; } @Override - public final Function lookup(String name) { + public Consumer lookupConsumer(String name) { @SuppressWarnings("unchecked") - FunctionFactory factory = (FunctionFactory) FACTORY_CACHE.get(name); + ConsumerFactory factory = (ConsumerFactory) CONSUMER_FACTORY_CACHE.get(name); if (factory != null) { - return factory.getFunction(); + return factory.getResult(); } - return this.doLookup(name); + return this.doLookupConsumer(name); } - protected abstract Function doLookup(String name); + @Override + public final Function lookupFunction(String name) { + @SuppressWarnings("unchecked") + FunctionFactory factory = (FunctionFactory) FUNCTION_FACTORY_CACHE.get(name); + if (factory != null) { + return factory.getResult(); + } + return this.doLookupFunction(name); + } - protected CompiledFunctionFactory compile(String name, String code) { - return this.compiler.compile(name, code); + @Override + public Supplier lookupSupplier(String name) { + @SuppressWarnings("unchecked") + SupplierFactory factory = (SupplierFactory) SUPPLIER_FACTORY_CACHE.get(name); + if (factory != null) { + return factory.getResult(); + } + return this.doLookupSupplier(name); + } + + protected abstract Consumer doLookupConsumer(String name); + + protected abstract Function doLookupFunction(String name); + + protected abstract Supplier doLookupSupplier(String name); + + protected CompiledFunctionFactory> compileConsumer(String name, String code) { + return this.consumerCompiler.compile(name, code); + } + + protected CompiledFunctionFactory> compileFunction(String name, String code) { + return this.functionCompiler.compile(name, code); + } + + protected CompiledFunctionFactory> compileSupplier(String name, String code) { + return this.supplierCompiler.compile(name, code); } @SuppressWarnings("unchecked") - protected Function deserialize(final String name, byte[] bytes) { - Assert.hasLength(name, "name must not be empty"); - String firstLetter = name.substring(0, 1).toUpperCase(); - String upperCasedName = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter; - String className = String.format("%s.%sFunctionFactory", FunctionCompiler.class.getPackage().getName(), upperCasedName); + protected Consumer deserializeConsumer(final String name, byte[] bytes) { + String className = formatClassName(name, Consumer.class); + Class factoryClass = this.classLoader.defineClass(className, bytes); + try { + ConsumerFactory factory = ((ConsumerFactory) factoryClass.newInstance()); + CONSUMER_FACTORY_CACHE.put(name, factory); + return factory.getResult(); + } + catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("failed to deserialize Consumer", e); + } + } + + @SuppressWarnings("unchecked") + protected Function deserializeFunction(final String name, byte[] bytes) { + String className = formatClassName(name, Function.class); Class factoryClass = this.classLoader.defineClass(className, bytes); try { FunctionFactory factory = ((FunctionFactory) factoryClass.newInstance()); - FACTORY_CACHE.put(name, factory); - return factory.getFunction(); + FUNCTION_FACTORY_CACHE.put(name, factory); + return factory.getResult(); } catch (InstantiationException | IllegalAccessException e) { - throw new IllegalArgumentException("failed to deserialize function", e); + throw new IllegalArgumentException("failed to deserialize Function", e); } } + + @SuppressWarnings("unchecked") + protected Supplier deserializeSupplier(final String name, byte[] bytes) { + String className = formatClassName(name, Supplier.class); + Class factoryClass = this.classLoader.defineClass(className, bytes); + try { + SupplierFactory factory = ((SupplierFactory) factoryClass.newInstance()); + SUPPLIER_FACTORY_CACHE.put(name, factory); + return factory.getResult(); + } + catch (InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException("failed to deserialize Supplier", e); + } + } + + private String formatClassName(String name, Class type) { + Assert.hasLength(name, "name must not be empty"); + String firstLetter = name.substring(0, 1).toUpperCase(); + String upperCasedName = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter; + return String.format("%s.%s%sFactory", FunctionCompiler.class.getPackage().getName(), upperCasedName, type.getSimpleName()); + } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistry.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistry.java index 5fbb3bd0f..370cb1007 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistry.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistry.java @@ -18,7 +18,9 @@ package org.springframework.cloud.function.registry; import java.io.File; import java.io.IOException; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.springframework.cloud.function.compiler.CompiledFunctionFactory; import org.springframework.util.Assert; @@ -29,7 +31,17 @@ import org.springframework.util.FileCopyUtils; */ public class FileSystemFunctionRegistry extends AbstractFunctionRegistry { - private final File directory; + private static final String CONSUMER_DIRECTORY = "consumers"; + + private static final String FUNCTION_DIRECTORY = "functions"; + + private static final String SUPPLIER_DIRECTORY = "suppliers"; + + private final File consumerDirectory; + + private final File functionDirectory; + + private final File supplierDirectory; public FileSystemFunctionRegistry() { this(new File("/tmp/function-registry")); @@ -44,28 +56,80 @@ public class FileSystemFunctionRegistry extends AbstractFunctionRegistry { Assert.isTrue(directory.isDirectory(), String.format("%s is not a directory.", directory.getAbsolutePath())); } - this.directory = directory; + this.consumerDirectory = new File(directory, CONSUMER_DIRECTORY); + this.functionDirectory = new File(directory, FUNCTION_DIRECTORY); + this.supplierDirectory = new File(directory, SUPPLIER_DIRECTORY); + this.consumerDirectory.mkdir(); + this.functionDirectory.mkdir(); + this.supplierDirectory.mkdir(); } @Override - public Function doLookup(String name) { + public Consumer doLookupConsumer(String name) { try { - byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.directory, fileName(name))); - return this.deserialize(name, bytes); + byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.consumerDirectory, fileName(name))); + return this.deserializeConsumer(name, bytes); } catch (IOException e) { - throw new IllegalArgumentException(String.format("failed to lookup function: %s", name), e); + throw new IllegalArgumentException(String.format("failed to lookup Consumer: %s", name), e); } } - public void register(String name, String function) { - CompiledFunctionFactory factory = this.compile(name, function); - File file = new File(this.directory, fileName(name)); + @Override + public Function doLookupFunction(String name) { + try { + byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.functionDirectory, fileName(name))); + return this.deserializeFunction(name, bytes); + } + catch (IOException e) { + throw new IllegalArgumentException(String.format("failed to lookup Function: %s", name), e); + } + } + + @Override + public Supplier doLookupSupplier(String name) { + try { + byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.supplierDirectory, fileName(name))); + return this.deserializeSupplier(name, bytes); + } + catch (IOException e) { + throw new IllegalArgumentException(String.format("failed to lookup Supplier: %s", name), e); + } + } + + @Override + public void registerConsumer(String name, String consumer) { + CompiledFunctionFactory factory = this.compileConsumer(name, consumer); + File file = new File(this.consumerDirectory, fileName(name)); try { FileCopyUtils.copy(factory.getGeneratedClassBytes(), file); } catch (IOException e) { - throw new IllegalArgumentException(String.format("failed to register function: %s", name), e); + throw new IllegalArgumentException(String.format("failed to register Consumer: %s", name), e); + } + } + + @Override + public void registerFunction(String name, String function) { + CompiledFunctionFactory factory = this.compileFunction(name, function); + File file = new File(this.functionDirectory, fileName(name)); + try { + FileCopyUtils.copy(factory.getGeneratedClassBytes(), file); + } + catch (IOException e) { + throw new IllegalArgumentException(String.format("failed to register Function: %s", name), e); + } + } + + @Override + public void registerSupplier(String name, String supplier) { + CompiledFunctionFactory factory = this.compileSupplier(name, supplier); + File file = new File(this.supplierDirectory, fileName(name)); + try { + FileCopyUtils.copy(factory.getGeneratedClassBytes(), file); + } + catch (IOException e) { + throw new IllegalArgumentException(String.format("failed to register Supplier: %s", name), e); } } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java index f3f6609cd..8fc8bc685 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/FunctionRegistry.java @@ -16,17 +16,26 @@ package org.springframework.cloud.function.registry; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** * @author Mark Fisher */ public interface FunctionRegistry { - void register(String name, String function); + void registerConsumer(String name, String consumer); - Function lookup(String name); + void registerFunction(String name, String function); - Function compose(String... functionNames); + void registerSupplier(String name, String supplier); + Consumer lookupConsumer(String name); + + Function lookupFunction(String name); + + Function composeFunction(String... functionNames); + + Supplier lookupSupplier(String name); } diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/InMemoryFunctionRegistry.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/InMemoryFunctionRegistry.java index 0105d0c1a..1cdd6e4f7 100644 --- a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/InMemoryFunctionRegistry.java +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/registry/InMemoryFunctionRegistry.java @@ -17,23 +17,51 @@ package org.springframework.cloud.function.registry; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; /** * @author Mark Fisher */ public class InMemoryFunctionRegistry extends AbstractFunctionRegistry { - private final ConcurrentHashMap> map = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> consumerMap = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap> functionMap = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap> supplierMap = new ConcurrentHashMap<>(); @Override @SuppressWarnings("unchecked") - public Function doLookup(String name) { - return this.map.get(name); + protected Consumer doLookupConsumer(String name) { + return this.consumerMap.get(name); } @Override - public void register(String name, String function) { - this.map.put(name, this.compile(name, function).getFunction()); + @SuppressWarnings("unchecked") + public Function doLookupFunction(String name) { + return this.functionMap.get(name); + } + + @Override + @SuppressWarnings("unchecked") + protected Supplier doLookupSupplier(String name) { + return this.supplierMap.get(name); + } + + @Override + public void registerConsumer(String name, String consumer) { + this.consumerMap.put(name, this.compileConsumer(name, consumer).getResult()); + } + + @Override + public void registerFunction(String name, String function) { + this.functionMap.put(name, this.compileFunction(name, function).getResult()); + } + + @Override + public void registerSupplier(String name, String supplier) { + this.supplierMap.put(name, this.compileSupplier(name, supplier).getResult()); } } diff --git a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java index a67d6615c..23a282bf7 100644 --- a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java +++ b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/gateway/LocalFunctionGatewayTests.java @@ -40,6 +40,7 @@ public class LocalFunctionGatewayTests { @Before public void init() { + registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())"); this.scheduler.initialize(); } diff --git a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistryTests.java b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistryTests.java index b537b2add..e57e3dff4 100644 --- a/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistryTests.java +++ b/spring-cloud-function-core/src/test/java/org/springframework/cloud/function/registry/FileSystemFunctionRegistryTests.java @@ -43,10 +43,10 @@ public class FileSystemFunctionRegistryTests { } @Test - public void registerAndLookup() throws IOException { + public void registerAndLookupFunction() throws IOException { FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(this.directory); - registry.register("uppercase", "f->f.map(s->s.toString().toUpperCase())"); - Function, Flux> function = registry.lookup("uppercase"); + registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())"); + Function, Flux> function = registry.lookupFunction("uppercase"); Flux output = function.apply(Flux.just("foo", "bar")); List results = output.collectList().block(); assertEquals("FOO", results.get(0)); @@ -54,11 +54,11 @@ public class FileSystemFunctionRegistryTests { } @Test - public void compose() throws IOException { + public void composeFunction() throws IOException { FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(this.directory); - registry.register("uppercase", "f->f.map(s->s.toString().toUpperCase())"); - registry.register("exclaim", "f->f.map(s->s+\"!!!\")"); - Function, Flux> function = registry.compose("uppercase", "exclaim"); + registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())"); + registry.registerFunction("exclaim", "f->f.map(s->s+\"!!!\")"); + Function, Flux> function = registry.composeFunction("uppercase", "exclaim"); Flux output = function.apply(Flux.just("foo", "bar")); List results = output.collectList().block(); assertEquals("FOO!!!", results.get(0)); diff --git a/spring-cloud-function-stream/pom.xml b/spring-cloud-function-stream/pom.xml index aafa6419a..96af96f04 100644 --- a/spring-cloud-function-stream/pom.xml +++ b/spring-cloud-function-stream/pom.xml @@ -38,7 +38,7 @@ org.springframework.cloud - spring-cloud-stream-binder-kafka + spring-cloud-stream-binder-rabbit ${spring-cloud-stream.version} diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 56a1e30dd..ac4500a12 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -49,8 +49,8 @@ public class StreamConfiguration { public AbstractFunctionInvoker invoker(FunctionRegistry registry) { String name = properties.getName(); Function, Flux> function = (name.indexOf(',') == -1) - ? registry.lookup(name) - : registry.compose(StringUtils.commaDelimitedListToStringArray(name)); + ? registry.lookupFunction(name) + : registry.composeFunction(StringUtils.commaDelimitedListToStringArray(name)); return new StreamListeningFunctionInvoker(function); } } diff --git a/spring-cloud-function-task/.jdk8 b/spring-cloud-function-task/.jdk8 new file mode 100644 index 000000000..e69de29bb diff --git a/spring-cloud-function-task/pom.xml b/spring-cloud-function-task/pom.xml new file mode 100644 index 000000000..4756b732f --- /dev/null +++ b/spring-cloud-function-task/pom.xml @@ -0,0 +1,49 @@ + + + 4.0.0 + + spring-cloud-function-task + jar + Spring Cloud Function Task Support + Spring Cloud Function Task Support + + + org.springframework.cloud + spring-cloud-function-parent + 1.0.0.BUILD-SNAPSHOT + + + + 1.8 + + + + + org.springframework.cloud + spring-cloud-task-starter + 1.1.0.BUILD-SNAPSHOT + + + org.springframework.cloud + spring-cloud-function-core + ${project.version} + + + io.projectreactor + reactor-core + + + org.springframework.boot + spring-boot-starter-logging + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/LambdaConfigurationProperties.java b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/LambdaConfigurationProperties.java new file mode 100644 index 000000000..2286894a3 --- /dev/null +++ b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/LambdaConfigurationProperties.java @@ -0,0 +1,56 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.task; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author Mark Fisher + */ +@ConfigurationProperties(prefix = "lambda") +public class LambdaConfigurationProperties { + + private String supplier; + + private String function; + + private String consumer; + + public String getSupplier() { + return supplier; + } + + public void setSupplier(String supplier) { + this.supplier = supplier; + } + + public String getFunction() { + return function; + } + + public void setFunction(String function) { + this.function = function; + } + + public String getConsumer() { + return consumer; + } + + public void setConsumer(String consumer) { + this.consumer = consumer; + } +} diff --git a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskApplication.java b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskApplication.java new file mode 100644 index 000000000..9218ec86c --- /dev/null +++ b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskApplication.java @@ -0,0 +1,31 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.task; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * @author Mark Fisher + */ +@SpringBootApplication +public class TaskApplication { + + public static void main(String[] args) { + SpringApplication.run(TaskApplication.class, args); + } +} diff --git a/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java new file mode 100644 index 000000000..ddbf8cdd7 --- /dev/null +++ b/spring-cloud-function-task/src/main/java/org/springframework/cloud/function/task/TaskConfiguration.java @@ -0,0 +1,98 @@ +/* + * Copyright 2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.task; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.cloud.function.registry.FileSystemFunctionRegistry; +import org.springframework.cloud.function.registry.FunctionRegistry; +import org.springframework.cloud.task.configuration.EnableTask; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +@Configuration +@EnableTask +@EnableConfigurationProperties(LambdaConfigurationProperties.class) +public class TaskConfiguration { + + @Autowired + private LambdaConfigurationProperties properties; + + @Bean + public FunctionRegistry registry() { + return new FileSystemFunctionRegistry(); + } + + @Bean + public CommandLineRunner commandLineRunner(FunctionRegistry registry) { + final Supplier> supplier = registry.lookupSupplier(properties.getSupplier()); + final Function, Flux> function = registry.lookupFunction(properties.getFunction()); + final Consumer consumer = registry.lookupConsumer(properties.getConsumer()); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicBoolean status = new AtomicBoolean(); + CommandLineRunner runner = new CommandLineRunner() { + + @Override + public void run(String... args) throws Exception { + function.apply(supplier.get()).subscribe(consumer, + new CompletionConsumer(latch, status, false), + new CompletionConsumer(latch, status, true)); + latch.await(); + } + }; + return runner; + } + + private static class CompletionConsumer implements Consumer, Runnable { + + private final CountDownLatch latch; + + private final AtomicBoolean status; + + private final boolean value; + + private CompletionConsumer(CountDownLatch latch, AtomicBoolean status, boolean value) { + this.latch = latch; + this.status = status; + this.value = value; + } + + @Override + public void accept(Throwable t) { + System.err.println("task failed: " + t); + this.run(); + } + + @Override + public void run() { + this.status.set(this.value); + this.latch.countDown(); + } + } +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java index e4ec41693..682a2a537 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RestConfiguration.java @@ -68,8 +68,8 @@ public class RestConfiguration { public HttpHandler httpHandler(FunctionRegistry registry) { String name = functionProperties.getName(); Function, Flux> function = (name.indexOf(',') == -1) - ? registry.lookup(name) - : registry.compose(StringUtils.commaDelimitedListToStringArray(name)); + ? registry.lookupFunction(name) + : registry.composeFunction(StringUtils.commaDelimitedListToStringArray(name)); FunctionInvokingHandler handler = new FunctionInvokingHandler(function); RouterFunction> route = RouterFunctions.route( POST(webProperties.getPath()).and(contentType(TEXT_PLAIN)), handler::handleText);