From 19fd056a5e48d569df7d58979c4fad4273b9b69d Mon Sep 17 00:00:00 2001 From: markfisher Date: Mon, 20 Feb 2017 10:31:22 -0500 Subject: [PATCH] add support for simple (non-Flux) types add objectToStringHttpMessageConverter CompilerController accepts parameterized types --- scripts/registerFunction.sh | 11 +- scripts/registerSupplier.sh | 7 +- spring-cloud-function-compiler/pom.xml | 5 + .../compiler/AbstractFunctionCompiler.java | 33 +++--- .../function/compiler/FunctionCompiler.java | 12 ++- .../app/CompiledFunctionRegistry.java | 8 +- .../compiler/app/CompilerController.java | 32 +++--- .../FunctionProxyApplicationListener.java | 18 +++- .../proxy/AbstractByteCodeLoadingProxy.java | 2 +- .../proxy/AbstractLambdaCompilingProxy.java | 10 +- .../proxy/ByteCodeLoadingFunction.java | 3 +- .../proxy/ByteCodeLoadingSupplier.java | 3 +- .../proxy/LambdaCompilingFunction.java | 3 +- .../proxy/LambdaCompilingSupplier.java | 3 +- .../cloud/function/support/FluxFunction.java | 44 ++++++++ .../cloud/function/support/FluxSupplier.java | 57 ++++++++++ .../cloud/function/support/FunctionProxy.java | 34 ++++++ .../cloud/function/support/FunctionUtils.java | 99 +++++++++++++++++ .../cloud/function/support/SupplierProxy.java | 33 ++++++ .../java/com/example/SampleApplication.java | 38 ++++--- .../com/example/functions/CharCounter.java | 30 ++++++ .../java/com/example/functions/Exclaimer.java | 32 ++++++ .../java/com/example/functions/Greeter.java | 30 ++++++ .../test/java/com/example/FunctionTests.java | 84 +++++++++++++++ .../com/example/SampleApplicationTests.java | 102 ++++++++++++++++-- .../function/stream/StreamConfiguration.java | 6 +- .../StreamListeningFunctionInvoker.java | 19 +++- .../SupplierInvokingMessageProducer.java | 11 +- .../function/web/FunctionController.java | 14 ++- .../web/flux/ReactorAutoConfiguration.java | 17 ++- 30 files changed, 717 insertions(+), 83 deletions(-) create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionProxy.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java create mode 100644 spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/SupplierProxy.java create mode 100644 spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/CharCounter.java create mode 100644 spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Exclaimer.java create mode 100644 spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Greeter.java create mode 100644 spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/FunctionTests.java diff --git a/scripts/registerFunction.sh b/scripts/registerFunction.sh index 7fcb02239..34ce2f921 100755 --- a/scripts/registerFunction.sh +++ b/scripts/registerFunction.sh @@ -1,6 +1,6 @@ #!/bin/bash -while getopts ":n:f:" opt; do +while getopts ":n:f:i:o:" opt; do case $opt in n) NAME=$OPTARG @@ -8,7 +8,14 @@ while getopts ":n:f:" opt; do f) FUNC=$OPTARG ;; + i) + INTYPE=$OPTARG + ;; + o) + OUTTYPE=$OPTARG + ;; esac done -curl -X POST -H "Content-Type: text/plain" -d $FUNC :8080/function/$NAME +curl -X POST -H "Content-Type: text/plain" -d $FUNC ":8080/function/$NAME?inputType=$INTYPE&outputType=$OUTTYPE" + diff --git a/scripts/registerSupplier.sh b/scripts/registerSupplier.sh index 84291c5b0..107368990 100755 --- a/scripts/registerSupplier.sh +++ b/scripts/registerSupplier.sh @@ -1,6 +1,6 @@ #!/bin/bash -while getopts ":n:f:" opt; do +while getopts ":n:f:t:" opt; do case $opt in n) NAME=$OPTARG @@ -8,7 +8,10 @@ while getopts ":n:f:" opt; do f) FUNC=$OPTARG ;; + t) + TYPE=$OPTARG + ;; esac done -curl -X POST -H "Content-Type: text/plain" -d $FUNC :8080/supplier/$NAME +curl -X POST -H "Content-Type: text/plain" -d $FUNC :8080/supplier/$NAME?type=$TYPE diff --git a/spring-cloud-function-compiler/pom.xml b/spring-cloud-function-compiler/pom.xml index 9327d62f1..5979bd875 100644 --- a/spring-cloud-function-compiler/pom.xml +++ b/spring-cloud-function-compiler/pom.xml @@ -41,6 +41,11 @@ org.springframework spring-context + + org.springframework.cloud + spring-cloud-function-core + ${project.version} + org.springframework.boot spring-boot-starter-web diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java index ad563a5fe..5044b5341 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/AbstractFunctionCompiler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,8 @@ import org.springframework.cloud.function.compiler.java.CompilationFailedExcepti import org.springframework.cloud.function.compiler.java.CompilationMessage; import org.springframework.cloud.function.compiler.java.CompilationResult; import org.springframework.cloud.function.compiler.java.RuntimeJavaCompiler; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; /** * @author Andy Clement @@ -59,13 +61,13 @@ public abstract class AbstractFunctionCompiler { private final ResultType resultType; - private final String parameterizedTypes; + private final String[] defaultResultTypeParameterizations; private final RuntimeJavaCompiler compiler = new RuntimeJavaCompiler(); - AbstractFunctionCompiler(ResultType type, String parameterizedTypes) { + AbstractFunctionCompiler(ResultType type, String... defaultResultTypeParameterizations) { this.resultType = type; - this.parameterizedTypes = parameterizedTypes; + this.defaultResultTypeParameterizations = defaultResultTypeParameterizations; } /** @@ -80,23 +82,25 @@ public abstract class AbstractFunctionCompiler { * * @return a factory instance */ - public CompiledFunctionFactory compile(String name, String code) { + public CompiledFunctionFactory compile(String name, String code, String... resultTypeParameterizations) { if (name == null || name.length() == 0) { throw new IllegalArgumentException("name must not be empty"); } logger.info("Initial code property value :'{}'", code); + String parameterizations = StringUtils.arrayToCommaDelimitedString( + (!ObjectUtils.isEmpty(resultTypeParameterizations)) ? resultTypeParameterizations : this.defaultResultTypeParameterizations); code = decode(code); if (code.startsWith("\"") && code.endsWith("\"")) { code = code.substring(1,code.length()-1); } if (!code.startsWith("return ") && !code.endsWith(";")) { - code = String.format("return (%s<%s> & java.io.Serializable) %s;", resultType, this.parameterizedTypes, code); + code = String.format("return (%s<%s> & java.io.Serializable) %s;", resultType, parameterizations, code); } logger.info("Processed code property value :\n{}\n", code); String firstLetter = name.substring(0, 1).toUpperCase(); name = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter; String className = String.format("%s.%s%sFactory", this.getClass().getPackage().getName(), name, resultType); - CompilationResult compilationResult = buildAndCompileSourceCode(className, code); + CompilationResult compilationResult = buildAndCompileSourceCode(className, code, parameterizations); if (compilationResult.wasSuccessful()) { return new CompiledFunctionFactory(className, compilationResult); } @@ -107,17 +111,18 @@ public abstract class AbstractFunctionCompiler { /** * Create the source for and then compile and load a class that embodies * the supplied methodBody. The methodBody is inserted into a class template that - * returns a Function<Flux<Object>,Flux<Object>>. + * returns the specified parameterized type. * This method can return more than one class if the method body includes local class * declarations. An example methodBody would be return input -> input.buffer(5).map(list->list.get(0));. * * @param className the name of the class - * @param methodBody the source code for a method that should return a - * Function<Flux<Object>,Flux<Object>> + * @param methodBody the source code for a method + * @param parameterTypeString the String representation for the parameterized return type, e.g.: + * <Flux<Object>,Flux<Object> * @return the list of Classes produced by compiling and then loading the snippet of code */ - private CompilationResult buildAndCompileSourceCode(String className, String methodBody) { - String sourceCode = makeSourceClassDefinition(className, methodBody); + private CompilationResult buildAndCompileSourceCode(String className, String methodBody, String parameterTypeString) { + String sourceCode = makeSourceClassDefinition(className, methodBody, parameterTypeString); return compiler.compile(className, sourceCode); } @@ -133,9 +138,9 @@ public abstract class AbstractFunctionCompiler { * @param methodBody the code to insert into the Reactive source class template * @return a complete Java Class definition */ - private String makeSourceClassDefinition(String className, String methodBody) { + private String makeSourceClassDefinition(String className, String methodBody, String types) { String shortClassName = className.substring(className.lastIndexOf('.') + 1); - String s = String.format(SOURCE_CODE_TEMPLATE, shortClassName, resultType, resultType, this.parameterizedTypes, methodBody); + String s = String.format(SOURCE_CODE_TEMPLATE, shortClassName, resultType, resultType, types, methodBody); System.out.println(s); return s; } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java index a896e90b2..1cc58113a 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/FunctionCompiler.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,14 @@ import java.util.function.Function; public class FunctionCompiler extends AbstractFunctionCompiler> { public FunctionCompiler() { - super(ResultType.Function, "Flux, Flux"); + this("Flux"); + } + + public FunctionCompiler(String type) { + this(type, type); + } + + public FunctionCompiler(String inputType, String outputType) { + super(ResultType.Function, String.format("%s, %s", inputType, outputType)); } } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompiledFunctionRegistry.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompiledFunctionRegistry.java index a18782884..7177e7675 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompiledFunctionRegistry.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompiledFunctionRegistry.java @@ -72,8 +72,8 @@ public class CompiledFunctionRegistry { this.consumerDirectory.mkdir(); } - public void registerSupplier(String name, String supplier) { - CompiledFunctionFactory factory = this.supplierCompiler.compile(name, supplier); + public void registerSupplier(String name, String supplier, String type) { + CompiledFunctionFactory factory = this.supplierCompiler.compile(name, supplier, type); File file = new File(this.supplierDirectory, fileName(name)); try { FileCopyUtils.copy(factory.getGeneratedClassBytes(), file); @@ -83,8 +83,8 @@ public class CompiledFunctionRegistry { } } - public void registerFunction(String name, String function) { - CompiledFunctionFactory factory = this.functionCompiler.compile(name, function); + public void registerFunction(String name, String function, String... types) { + CompiledFunctionFactory factory = this.functionCompiler.compile(name, function, types); File file = new File(this.functionDirectory, fileName(name)); try { FileCopyUtils.copy(factory.getGeneratedClassBytes(), file); diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompilerController.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompilerController.java index 00b949f7f..49134959a 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompilerController.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/app/CompilerController.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.compiler.app; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -29,20 +30,21 @@ public class CompilerController { private final CompiledFunctionRegistry registry = new CompiledFunctionRegistry(); - @PostMapping(path = "/{type}/{name}") - public void registerFunction(@PathVariable String type, @PathVariable String name, @RequestBody String lambda) { - switch (type) { - case "supplier": - registry.registerSupplier(name, lambda); - break; - case "function": - registry.registerFunction(name, lambda); - break; - case "consumer": - registry.registerConsumer(name, lambda); - break; - default: - throw new IllegalArgumentException("unknown type: " + type); - } + @PostMapping(path = "/supplier/{name}") + public void registerSupplier(@PathVariable String name, @RequestBody String lambda, + @RequestParam(defaultValue="Flux") String type) { + this.registry.registerSupplier(name, lambda, type); + } + + @PostMapping(path = "/function/{name}") + public void registerFunction(@PathVariable String name, @RequestBody String lambda, + @RequestParam(defaultValue="Flux") String inputType, + @RequestParam(defaultValue="Flux") String outputType) { + this.registry.registerFunction(name, lambda, inputType, outputType); + } + + @PostMapping(path = "/consumer/{name}") + public void registerConsumer(@PathVariable String name, @RequestBody String lambda) { + this.registry.registerConsumer(name, lambda); } } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/config/FunctionProxyApplicationListener.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/config/FunctionProxyApplicationListener.java index f383de0ed..0b51f24d8 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/config/FunctionProxyApplicationListener.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/config/FunctionProxyApplicationListener.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.compiler.config; import java.util.Map; +import org.springframework.beans.MutablePropertyValues; import org.springframework.beans.factory.config.ConstructorArgumentValues; import org.springframework.beans.factory.support.DefaultListableBeanFactory; import org.springframework.beans.factory.support.RootBeanDefinition; @@ -68,7 +69,9 @@ public class FunctionProxyApplicationListener implements ApplicationListener proxyClass = null; if ("supplier".equals(type.toLowerCase())) { proxyClass = LambdaCompilingSupplier.class; args.addGenericArgumentValue(this.supplierCompiler); + if (outputType != null) { + props.add("typeParameterizations", outputType); + } } else if ("consumer".equals(type.toLowerCase())) { proxyClass = LambdaCompilingConsumer.class; @@ -107,9 +114,16 @@ public class FunctionProxyApplicationListener implements ApplicationListener implements InitializingBea } } - protected final T getTarget() { + public final T getTarget() { return this.target; } } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/AbstractLambdaCompilingProxy.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/AbstractLambdaCompilingProxy.java index b77df9b52..c832178d4 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/AbstractLambdaCompilingProxy.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/AbstractLambdaCompilingProxy.java @@ -39,6 +39,8 @@ public class AbstractLambdaCompilingProxy implements InitializingBean, BeanNa private T target; + private String[] typeParameterizations; + public AbstractLambdaCompilingProxy(Resource resource, AbstractFunctionCompiler compiler) { Assert.notNull(resource, "Resource must not be null"); Assert.notNull(compiler, "Compiler must not be null"); @@ -51,14 +53,18 @@ public class AbstractLambdaCompilingProxy implements InitializingBean, BeanNa this.beanName = beanName; } + public void setTypeParameterizations(String... typeParameterizations) { + this.typeParameterizations = typeParameterizations; + } + @Override public void afterPropertiesSet() throws Exception { String lambda = FileCopyUtils.copyToString(new InputStreamReader(this.resource.getInputStream())); - CompiledFunctionFactory factory = this.compiler.compile(this.beanName, lambda); + CompiledFunctionFactory factory = this.compiler.compile(this.beanName, lambda, this.typeParameterizations); this.target = factory.getResult(); } - protected final T getTarget() { + public final T getTarget() { return this.target; } } diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingFunction.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingFunction.java index 345daadbc..a5770c2bd 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingFunction.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingFunction.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.compiler.proxy; import java.util.function.Function; import org.springframework.beans.factory.InitializingBean; +import org.springframework.cloud.function.support.FunctionProxy; import org.springframework.core.io.Resource; /** @@ -27,7 +28,7 @@ import org.springframework.core.io.Resource; * @param Function input type * @param Function result type */ -public class ByteCodeLoadingFunction extends AbstractByteCodeLoadingProxy> implements Function, InitializingBean { +public class ByteCodeLoadingFunction extends AbstractByteCodeLoadingProxy> implements FunctionProxy, InitializingBean { public ByteCodeLoadingFunction(Resource resource) { super(resource, Function.class); diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingSupplier.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingSupplier.java index 3b6fedc0b..0b14fc6a7 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingSupplier.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/ByteCodeLoadingSupplier.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.compiler.proxy; import java.util.function.Supplier; +import org.springframework.cloud.function.support.SupplierProxy; import org.springframework.core.io.Resource; /** @@ -25,7 +26,7 @@ import org.springframework.core.io.Resource; * * @param type */ -public class ByteCodeLoadingSupplier extends AbstractByteCodeLoadingProxy> implements Supplier { +public class ByteCodeLoadingSupplier extends AbstractByteCodeLoadingProxy> implements SupplierProxy { public ByteCodeLoadingSupplier(Resource resource) { super(resource, Supplier.class); diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingFunction.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingFunction.java index 855231919..32687c6d0 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingFunction.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingFunction.java @@ -19,12 +19,13 @@ package org.springframework.cloud.function.compiler.proxy; import java.util.function.Function; import org.springframework.cloud.function.compiler.FunctionCompiler; +import org.springframework.cloud.function.support.FunctionProxy; import org.springframework.core.io.Resource; /** * @author Mark Fisher */ -public class LambdaCompilingFunction extends AbstractLambdaCompilingProxy> implements Function { +public class LambdaCompilingFunction extends AbstractLambdaCompilingProxy> implements FunctionProxy { public LambdaCompilingFunction(Resource resource, FunctionCompiler compiler) { super(resource, compiler); diff --git a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingSupplier.java b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingSupplier.java index a4996f70c..ea88752a7 100644 --- a/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingSupplier.java +++ b/spring-cloud-function-compiler/src/main/java/org/springframework/cloud/function/compiler/proxy/LambdaCompilingSupplier.java @@ -19,12 +19,13 @@ package org.springframework.cloud.function.compiler.proxy; import java.util.function.Supplier; import org.springframework.cloud.function.compiler.SupplierCompiler; +import org.springframework.cloud.function.support.SupplierProxy; import org.springframework.core.io.Resource; /** * @author Mark Fisher */ -public class LambdaCompilingSupplier extends AbstractLambdaCompilingProxy> implements Supplier { +public class LambdaCompilingSupplier extends AbstractLambdaCompilingProxy> implements SupplierProxy { public LambdaCompilingSupplier(Resource resource, SupplierCompiler compiler) { super(resource, compiler); diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java new file mode 100644 index 000000000..a391c6e28 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxFunction.java @@ -0,0 +1,44 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.util.function.Function; + +import reactor.core.publisher.Flux; + +/** + * {@link Function} implementation that wraps a target Function so that the target's + * simple input and output types will be wrapped as {@link Flux} instances. + * + * @author Mark Fisher + * + * @param input type of target function + * @param output type of target function + */ +public class FluxFunction implements Function, Flux> { + + private final Function function; + + public FluxFunction(Function function) { + this.function = function; + } + + @Override + public Flux apply(Flux input) { + return input.map(i->this.function.apply(i)); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java new file mode 100644 index 000000000..86e6cbe76 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FluxSupplier.java @@ -0,0 +1,57 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.time.Duration; +import java.util.function.Supplier; + +import reactor.core.publisher.Flux; + +/** + * {@link Supplier} implementation that wraps a target Supplier so that the + * target's simple output type will be wrapped in a {@link Flux} instance. + * If a {@link Duration} is provided, the Flux will produce output + * periodically, invoking the target Supplier's {@code get} method at each + * interval. If no Duration is provided, the target will be invoked only once. + * + * @author Mark Fisher + * + * @param output type of target supplier + */ +public class FluxSupplier implements Supplier> { + + private final Supplier supplier; + + private final Duration period; + + public FluxSupplier(Supplier supplier) { + this(supplier, null); + } + + public FluxSupplier(Supplier supplier, Duration period) { + this.supplier = supplier; + this.period = period; + } + + @Override + public Flux get() { + if (this.period != null) { + return Flux.interval(this.period).map(i->this.supplier.get()); + } + return Flux.just(this.supplier.get()); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionProxy.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionProxy.java new file mode 100644 index 000000000..34eeb141d --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionProxy.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.util.function.Function; + +/** + * @author Mark Fisher + * + * @param input type of target Function + * @param output type of target Function + */ +public interface FunctionProxy extends Function { + + default boolean isFluxFunction() { + return FunctionUtils.isFluxFunction(getTarget()); + } + + Function getTarget(); +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java new file mode 100644 index 000000000..20d793cc1 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/FunctionUtils.java @@ -0,0 +1,99 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.lang.invoke.SerializedLambda; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.springframework.util.ObjectUtils; +import org.springframework.util.ReflectionUtils; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +public abstract class FunctionUtils { + + private static final String FLUX_CLASS_NAME = Flux.class.getName(); + + private FunctionUtils() {} + + @SuppressWarnings("rawtypes") + public static boolean isFluxSupplier(Supplier supplier) { + if (supplier instanceof SupplierProxy) { + return ((SupplierProxy) supplier).isFluxSupplier(); + } + String[] types = getParameterizedTypeNames(supplier, Supplier.class); + if (ObjectUtils.isEmpty(types)) { + return true; + } + return (types[0].startsWith(FLUX_CLASS_NAME)); + } + + @SuppressWarnings("rawtypes") + public static boolean isFluxFunction(Function function) { + if (function instanceof FunctionProxy) { + return ((FunctionProxy) function).isFluxFunction(); + } + String[] types = getParameterizedTypeNames(function, Function.class); + if (ObjectUtils.isEmpty(types) || types.length != 2) { + return true; + } + return (types[0].startsWith(FLUX_CLASS_NAME) && types[1].startsWith(FLUX_CLASS_NAME)); + } + + private static String[] getParameterizedTypeNames(Object source, Class interfaceClass) { + Type[] genericInterfaces = source.getClass().getGenericInterfaces(); + for (Type genericInterface : genericInterfaces) { + if ((genericInterface instanceof ParameterizedType) + && interfaceClass.getTypeName().equals(((ParameterizedType) genericInterface).getRawType().getTypeName())) { + ParameterizedType type = (ParameterizedType) genericInterface; + Type[] args = type.getActualTypeArguments(); + if (args != null) { + String[] typeNames = new String[args.length]; + for (int i = 0; i < args.length; i++) { + typeNames[i] = args[i].getTypeName(); + } + return typeNames; + } + } + } + return getSerializedLambdaParameterizedTypeNames(source); + } + + private static String[] getSerializedLambdaParameterizedTypeNames(Object source) { + Method method = ReflectionUtils.findMethod(source.getClass(), "writeReplace"); + if (method == null) { + return null; + } + ReflectionUtils.makeAccessible(method); + SerializedLambda serializedLambda = (SerializedLambda) ReflectionUtils.invokeMethod(method, source); + String signature = serializedLambda.getImplMethodSignature().replaceAll("[()]", ""); + List typeNames = new ArrayList<>(); + for (String types : signature.split(";")) { + typeNames.add(types.substring(1).replace('/', '.')); + } + return typeNames.toArray(new String[typeNames.size()]); + } +} diff --git a/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/SupplierProxy.java b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/SupplierProxy.java new file mode 100644 index 000000000..c24a83d52 --- /dev/null +++ b/spring-cloud-function-core/src/main/java/org/springframework/cloud/function/support/SupplierProxy.java @@ -0,0 +1,33 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.support; + +import java.util.function.Supplier; + +/** + * @author Mark Fisher + * + * @param output type of target Supplier + */ +public interface SupplierProxy extends Supplier { + + default boolean isFluxSupplier() { + return FunctionUtils.isFluxSupplier(getTarget()); + } + + Supplier getTarget(); +} diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/SampleApplication.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/SampleApplication.java index 02adb1a52..54803ffb6 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/SampleApplication.java +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/SampleApplication.java @@ -23,22 +23,19 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.function.compiler.FunctionCompiler; import org.springframework.cloud.function.compiler.proxy.LambdaCompilingFunction; +import org.springframework.cloud.function.context.FunctionScan; import org.springframework.context.annotation.Bean; import org.springframework.core.io.ByteArrayResource; import reactor.core.publisher.Flux; +@FunctionScan @SpringBootApplication public class SampleApplication { @Bean - public Function, Flux> uppercase() { - return flux -> flux.map(value -> value.toUpperCase()); - } - - @Bean - public Supplier> words() { - return () -> Flux.fromArray(new String[] { "foo", "bar" }); + public Function uppercase() { + return value -> value.toUpperCase(); } @Bean @@ -47,18 +44,35 @@ public class SampleApplication { } @Bean - public FunctionCompiler compiler() { - return new FunctionCompiler<>(); + public Supplier hello() { + return () -> "hello"; } @Bean - public Function, Flux> compiledUppercase(FunctionCompiler, Flux> compiler) { - String lambda = "f -> f.map(o -> o.toString().toUpperCase())"; + public Supplier> words() { + return () -> Flux.fromArray(new String[] { "foo", "bar" }); + } + + @Bean + public Function compiledUppercase(FunctionCompiler compiler) { + String lambda = "s -> s.toUpperCase()"; + LambdaCompilingFunction function = new LambdaCompilingFunction<>(new ByteArrayResource(lambda.getBytes()), compiler); + function.setTypeParameterizations("String", "String"); + return function; + } + + @Bean + public Function, Flux> compiledLowercase(FunctionCompiler, Flux> compiler) { + String lambda = "f->f.map(o->o.toString().toLowerCase())"; return new LambdaCompilingFunction<>(new ByteArrayResource(lambda.getBytes()), compiler); } + @Bean + public FunctionCompiler compiler() { + return new FunctionCompiler<>(); + } + public static void main(String[] args) throws Exception { SpringApplication.run(SampleApplication.class, args); } - } diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/CharCounter.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/CharCounter.java new file mode 100644 index 000000000..a6ebb1422 --- /dev/null +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/CharCounter.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.functions; + +import java.util.function.Function; + +/** + * @author Mark Fisher + */ +public class CharCounter implements Function { + + @Override + public Integer apply(String word) { + return word.length(); + } +} diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Exclaimer.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Exclaimer.java new file mode 100644 index 000000000..6de425a1c --- /dev/null +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Exclaimer.java @@ -0,0 +1,32 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.functions; + +import java.util.function.Function; + +import reactor.core.publisher.Flux; + +/** + * @author Mark Fisher + */ +public class Exclaimer implements Function, Flux> { + + @Override + public Flux apply(Flux words) { + return words.map(word->word+"!!!"); + } +} diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Greeter.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Greeter.java new file mode 100644 index 000000000..e4f2bcae0 --- /dev/null +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/main/java/com/example/functions/Greeter.java @@ -0,0 +1,30 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.functions; + +import java.util.function.Function; + +/** + * @author Mark Fisher + */ +public class Greeter implements Function { + + @Override + public String apply(String name) { + return "Hello " + name; + } +} diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/FunctionTests.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/FunctionTests.java new file mode 100644 index 000000000..2f599144d --- /dev/null +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/FunctionTests.java @@ -0,0 +1,84 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.junit.Test; + +import com.example.functions.CharCounter; +import com.example.functions.Exclaimer; +import com.example.functions.Greeter; + +import reactor.core.publisher.Flux; + +public class FunctionTests { + + private final SampleApplication functions = new SampleApplication(); + + @Test + public void testUppercase() { + String output = functions.uppercase().apply("foobar"); + assertEquals("FOOBAR", output); + } + + @Test + public void testLowercase() { + Flux output = functions.lowercase().apply(Flux.just("FOO", "BAR")); + List results = output.collectList().block(); + assertEquals(2, results.size()); + assertEquals("foo", results.get(0)); + assertEquals("bar", results.get(1)); + } + + @Test + public void testHello() { + String output = functions.hello().get(); + assertEquals("hello", output); + } + + @Test + public void testWords() { + Flux output = functions.words().get(); + List results = output.collectList().block(); + assertEquals(2, results.size()); + assertEquals("foo", results.get(0)); + assertEquals("bar", results.get(1)); + } + + @Test + public void testGreeter() { + assertEquals("Hello World", new Greeter().apply("World")); + } + + @Test + public void testExclaimer() { + Flux input = Flux.just("foo", "bar"); + Flux output = new Exclaimer().apply(input); + List results = output.collectList().block(); + assertEquals(2, results.size()); + assertEquals("foo!!!", results.get(0)); + assertEquals("bar!!!", results.get(1)); + } + + @Test + public void testCharCounter() { + assertEquals((Integer) 21, new CharCounter().apply("this is 21 chars long")); + } +} diff --git a/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/SampleApplicationTests.java b/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/SampleApplicationTests.java index 481ff0172..cf51efbed 100644 --- a/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/SampleApplicationTests.java +++ b/spring-cloud-function-samples/spring-cloud-function-sample/src/test/java/com/example/SampleApplicationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016 the original author or authors. + * Copyright 2016-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,44 +13,126 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.example; import static org.junit.Assert.assertEquals; import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; import reactor.core.publisher.Flux; +@RunWith(SpringRunner.class) +@SpringBootTest(properties = "spring.cloud.function.scan.packages=com.example.functions") public class SampleApplicationTests { - private final SampleApplication functions = new SampleApplication(); + @Test + public void contextLoads() { + } + + @Autowired + private Function uppercase; @Test public void testUppercase() { - Flux output = functions.uppercase().apply(Flux.just("foo", "bar")); - List results = output.collectList().block(); - assertEquals(2, results.size()); - assertEquals("FOO", results.get(0)); - assertEquals("BAR", results.get(1)); + String output = this.uppercase.apply("foobar"); + assertEquals("FOOBAR", output); } + @Autowired + private Function, Flux> lowercase; + @Test public void testLowercase() { - Flux output = functions.lowercase().apply(Flux.just("FOO", "BAR")); + Flux output = this.lowercase.apply(Flux.just("FOO", "BAR")); List results = output.collectList().block(); assertEquals(2, results.size()); assertEquals("foo", results.get(0)); assertEquals("bar", results.get(1)); } + @Autowired + private Supplier hello; + + @Test + public void testHello() { + String output = this.hello.get(); + assertEquals("hello", output); + } + + @Autowired + private Supplier> words; + @Test public void testWords() { - Flux output = functions.words().get(); + Flux output = this.words.get(); List results = output.collectList().block(); assertEquals(2, results.size()); assertEquals("foo", results.get(0)); assertEquals("bar", results.get(1)); } -} + + @Autowired + private Function compiledUppercase; + + @Test + public void testCompiledUppercase() { + String output = this.compiledUppercase.apply("foobar"); + assertEquals("FOOBAR", output); + } + + @Autowired + private Function, Flux> compiledLowercase; + + @Test + public void testCompiledLowercase() { + Flux input = Flux.just("FOO", "BAR"); + Flux output = this.compiledLowercase.apply(input); + List results = output.collectList().block(); + assertEquals(2, results.size()); + assertEquals("foo", results.get(0)); + assertEquals("bar", results.get(1)); + } + + // the following are contributed via @FunctionScan: + + @Autowired + private Function greeter; + + @Test + public void testGreeter() { + String greeting = this.greeter.apply("World"); + assertEquals("Hello World", greeting); + } + + @Autowired + private Function, Flux> exclaimer; + + @Test + public void testExclaimer() { + Flux input = Flux.just("foo", "bar"); + Flux output = this.exclaimer.apply(input); + List results = output.collectList().block(); + assertEquals(2, results.size()); + assertEquals("foo!!!", results.get(0)); + assertEquals("bar!!!", results.get(1)); + } + + @Autowired + private Function charCounter; + + @Test + public void testCharCounter() { + Integer length = this.charCounter.apply("the quick brown fox"); + assertEquals(new Integer(19), length); + } +} \ No newline at end of file diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java index 2dc666e4c..f34f6849b 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamConfiguration.java @@ -33,6 +33,8 @@ import org.springframework.boot.autoconfigure.condition.SpringBootCondition; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; import org.springframework.cloud.function.registry.FunctionCatalog; +import org.springframework.cloud.function.support.FluxFunction; +import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binder.Binder; import org.springframework.cloud.stream.messaging.Processor; @@ -43,6 +45,7 @@ import org.springframework.context.annotation.ConditionContext; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.ConfigurationCondition; import org.springframework.core.type.AnnotatedTypeMetadata; +import org.springframework.util.Assert; import org.springframework.util.StringUtils; import reactor.core.publisher.Flux; @@ -82,7 +85,8 @@ public class StreamConfiguration { @ConditionalOnProperty("spring.cloud.stream.bindings.input.destination") public AbstractFunctionInvoker invoker(FunctionCatalog registry) { String name = properties.getEndpoint(); - Function, Flux> function = registry.lookupFunction(name); + Function function = registry.lookupFunction(name); + Assert.notNull(function, "no such function: " + name); return new StreamListeningFunctionInvoker(function); } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 926a65df6..2fa0e57b6 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -19,25 +19,36 @@ package org.springframework.cloud.function.stream; import java.util.function.Function; import org.springframework.cloud.function.invoker.AbstractFunctionInvoker; +import org.springframework.cloud.function.support.FluxFunction; +import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Processor; +import org.springframework.util.Assert; import reactor.core.publisher.Flux; /** * @author Mark Fisher */ -public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> { +public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker, Flux> { - public StreamListeningFunctionInvoker(Function, Flux> function) { - super(function); + public StreamListeningFunctionInvoker(Function function) { + super(wrapIfNecessary(function)); } @StreamListener @Output(Processor.OUTPUT) - public Flux handle(@Input(Processor.INPUT) Flux input) { + public Flux handle(@Input(Processor.INPUT) Flux input) { return this.doInvoke(input); } + + private static Function, Flux> wrapIfNecessary(Function function) { + Assert.notNull(function, "Function must not be null"); + if (!FunctionUtils.isFluxFunction(function)) { + function = new FluxFunction(function); + } + return function; + } } diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java index 15d985cfb..cd2eb6f4b 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/SupplierInvokingMessageProducer.java @@ -18,9 +18,12 @@ package org.springframework.cloud.function.stream; import java.util.function.Supplier; +import org.springframework.cloud.function.support.FluxSupplier; +import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.util.Assert; import reactor.core.publisher.Flux; @@ -31,8 +34,12 @@ public class SupplierInvokingMessageProducer extends MessageProducerSupport { private final Supplier> supplier; - public SupplierInvokingMessageProducer(Supplier> supplier) { - this.supplier = supplier; + public SupplierInvokingMessageProducer(Supplier supplier) { + Assert.notNull(supplier, "Supplier must not be null"); + if (!FunctionUtils.isFluxSupplier(supplier)) { + supplier = new FluxSupplier<>(supplier); + } + this.supplier = (Supplier>) supplier; this.setOutputChannelName(Source.OUTPUT); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index c0aabcdfa..246f6c146 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -18,11 +18,15 @@ package org.springframework.cloud.function.web; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.function.registry.FunctionCatalog; +import org.springframework.cloud.function.support.FluxFunction; +import org.springframework.cloud.function.support.FluxSupplier; +import org.springframework.cloud.function.support.FunctionUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -54,6 +58,9 @@ public class FunctionController { @RequestBody Flux body) { Function function = functions.lookupFunction(name); if (function != null) { + if (!FunctionUtils.isFluxFunction(function)) { + function = new FluxFunction(function); + } @SuppressWarnings("unchecked") Flux result = (Flux) function.apply(body); return debug ? result.log() : result; @@ -68,8 +75,11 @@ public class FunctionController { @GetMapping(path = "/{name}") public Flux supplier(@PathVariable String name) { - @SuppressWarnings("unchecked") - Flux result = (Flux) functions.lookupSupplier(name).get(); + Supplier supplier = functions.lookupSupplier(name); + if (!FunctionUtils.isFluxSupplier(supplier)) { + supplier = new FluxSupplier(supplier); + } + Flux result = (Flux) supplier.get(); return debug ? result.log() : result; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java index bfcdcab14..d1b3f9cc7 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ReactorAutoConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2016 the original author or authors. + * Copyright 2013-2017 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,11 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.boot.autoconfigure.web.HttpMessageConverters; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.convert.support.DefaultConversionService; +import org.springframework.http.converter.ObjectToStringHttpMessageConverter; import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; import org.springframework.web.method.support.HandlerMethodReturnValueHandler; import org.springframework.web.servlet.config.annotation.WebMvcConfigurerAdapter; @@ -32,6 +35,7 @@ import reactor.core.publisher.Flux; /** * @author Dave Syer + * @author Mark Fisher */ @Configuration @ConditionalOnWebApplication @@ -39,7 +43,12 @@ import reactor.core.publisher.Flux; public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { @Autowired - private FluxReturnValueHandler returnValueHandler; + private ApplicationContext context; + + @Bean + public ObjectToStringHttpMessageConverter objectToStringHttpMessageConverter() { + return new ObjectToStringHttpMessageConverter(new DefaultConversionService()); + } @Bean public FluxReturnValueHandler fluxReturnValueHandler( @@ -49,6 +58,7 @@ public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { @Configuration protected static class FluxMessageConverterConfiguration { + @Bean public FluxHttpMessageConverter fluxHttpMessageConverter() { return new FluxHttpMessageConverter(); @@ -58,7 +68,6 @@ public class ReactorAutoConfiguration extends WebMvcConfigurerAdapter { @Override public void addReturnValueHandlers( List returnValueHandlers) { - returnValueHandlers.add(returnValueHandler); + returnValueHandlers.add(context.getBean(FluxReturnValueHandler.class)); } - }