add support for Supplier and Consumer

This commit is contained in:
markfisher
2016-10-14 11:07:37 -04:00
parent 971cc1ebd8
commit b0db0233be
32 changed files with 892 additions and 183 deletions

View File

@@ -24,11 +24,25 @@ import org.springframework.cloud.function.registry.FileSystemFunctionRegistry;
public class FunctionRegistrar {
public static void main(String[] args) {
if (args.length != 2) {
System.err.println("USAGE: java FunctionRegistrar functionName functionBody");
String usage = "USAGE: java FunctionRegistrar [supplier|function|consumer] name lambda";
if (args.length != 3) {
System.err.println(usage);
System.exit(1);
}
FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry();
registry.register(args[0], args[1]);
String type = args[0];
if ("supplier".equalsIgnoreCase(type)) {
registry.registerSupplier(args[1], args[2]);
}
else if ("function".equalsIgnoreCase(type)) {
registry.registerFunction(args[1], args[2]);
}
else if ("consumer".equalsIgnoreCase(type)) {
registry.registerConsumer(args[1], args[2]);
}
else {
System.err.println(usage);
System.exit(1);
}
}
}

View File

@@ -48,19 +48,19 @@ public class LocalFunctionGateway implements FunctionGateway {
@Override
public <T, R> R invoke(String functionName, T request) {
Function<T, R> function = this.registry.lookup(functionName);
Function<T, R> function = this.registry.lookupFunction(functionName);
return function.apply(request);
}
@Override
public <T, R> void schedule(String functionName, Trigger trigger, Supplier<T> supplier, Consumer<R> consumer) {
Function<T, R> function = this.registry.lookup(functionName);
Function<T, R> function = this.registry.lookupFunction(functionName);
this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger);
}
@Override
public <T, R> void subscribe(Publisher<T> publisher, String functionName, final Consumer<R> consumer) {
final Function<T, R> function = this.registry.lookup(functionName);
final Function<T, R> function = this.registry.lookupFunction(functionName);
publisher.subscribe(new Subscriber<T>() {
@Override

View File

@@ -18,11 +18,17 @@ package org.springframework.cloud.function.registry;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.cloud.function.compiler.CompiledFunctionFactory;
import org.springframework.cloud.function.compiler.ConsumerCompiler;
import org.springframework.cloud.function.compiler.ConsumerFactory;
import org.springframework.cloud.function.compiler.FunctionCompiler;
import org.springframework.cloud.function.compiler.FunctionFactory;
import org.springframework.cloud.function.compiler.SupplierCompiler;
import org.springframework.cloud.function.compiler.SupplierFactory;
import org.springframework.cloud.function.compiler.java.SimpleClassLoader;
import org.springframework.util.Assert;
@@ -31,53 +37,125 @@ import org.springframework.util.Assert;
*/
public abstract class AbstractFunctionRegistry implements FunctionRegistry {
private static final Map<String, FunctionFactory<?, ?>> FACTORY_CACHE = new HashMap<>();
private static final Map<String, ConsumerFactory<?>> CONSUMER_FACTORY_CACHE = new HashMap<>();
private final FunctionCompiler compiler = new FunctionCompiler();
private static final Map<String, FunctionFactory<?, ?>> FUNCTION_FACTORY_CACHE = new HashMap<>();
private static final Map<String, SupplierFactory<?>> SUPPLIER_FACTORY_CACHE = new HashMap<>();
private final ConsumerCompiler consumerCompiler = new ConsumerCompiler<>();
private final FunctionCompiler functionCompiler = new FunctionCompiler<>();
private final SupplierCompiler supplierCompiler = new SupplierCompiler<>();
private final SimpleClassLoader classLoader = new SimpleClassLoader(AbstractFunctionRegistry.class.getClassLoader());
@Override
@SuppressWarnings("unchecked")
public <T, R> Function<T, R> compose(String... functionNames) {
public <T, R> Function<T, R> composeFunction(String... functionNames) {
@SuppressWarnings("rawtypes")
Function function = this.lookup(functionNames[0]);
Function function = this.lookupFunction(functionNames[0]);
for (int i = 1; i < functionNames.length; i++) {
function = function.andThen(this.lookup(functionNames[i]));
function = function.andThen(this.lookupFunction(functionNames[i]));
}
return function;
}
@Override
public final <T, R> Function<T, R> lookup(String name) {
public <T> Consumer<T> lookupConsumer(String name) {
@SuppressWarnings("unchecked")
FunctionFactory<T, R> factory = (FunctionFactory<T, R>) FACTORY_CACHE.get(name);
ConsumerFactory<T> factory = (ConsumerFactory<T>) CONSUMER_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getFunction();
return factory.getResult();
}
return this.doLookup(name);
return this.doLookupConsumer(name);
}
protected abstract <T, R> Function<T, R> doLookup(String name);
@Override
public final <T, R> Function<T, R> lookupFunction(String name) {
@SuppressWarnings("unchecked")
FunctionFactory<T, R> factory = (FunctionFactory<T, R>) FUNCTION_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getResult();
}
return this.doLookupFunction(name);
}
protected <T, R> CompiledFunctionFactory<T, R> compile(String name, String code) {
return this.compiler.compile(name, code);
@Override
public <T> Supplier<T> lookupSupplier(String name) {
@SuppressWarnings("unchecked")
SupplierFactory<T> factory = (SupplierFactory<T>) SUPPLIER_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getResult();
}
return this.doLookupSupplier(name);
}
protected abstract <T> Consumer<T> doLookupConsumer(String name);
protected abstract <T, R> Function<T, R> doLookupFunction(String name);
protected abstract <T> Supplier<T> doLookupSupplier(String name);
protected <T> CompiledFunctionFactory<Consumer<T>> compileConsumer(String name, String code) {
return this.consumerCompiler.compile(name, code);
}
protected <T, R> CompiledFunctionFactory<Function<T, R>> compileFunction(String name, String code) {
return this.functionCompiler.compile(name, code);
}
protected <T> CompiledFunctionFactory<Supplier<T>> compileSupplier(String name, String code) {
return this.supplierCompiler.compile(name, code);
}
@SuppressWarnings("unchecked")
protected <T, R> Function<T, R> deserialize(final String name, byte[] bytes) {
Assert.hasLength(name, "name must not be empty");
String firstLetter = name.substring(0, 1).toUpperCase();
String upperCasedName = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter;
String className = String.format("%s.%sFunctionFactory", FunctionCompiler.class.getPackage().getName(), upperCasedName);
protected <T> Consumer<T> deserializeConsumer(final String name, byte[] bytes) {
String className = formatClassName(name, Consumer.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
ConsumerFactory<T> factory = ((ConsumerFactory<T>) factoryClass.newInstance());
CONSUMER_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize Consumer", e);
}
}
@SuppressWarnings("unchecked")
protected <T, R> Function<T, R> deserializeFunction(final String name, byte[] bytes) {
String className = formatClassName(name, Function.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
FunctionFactory<T, R> factory = ((FunctionFactory<T, R>) factoryClass.newInstance());
FACTORY_CACHE.put(name, factory);
return factory.getFunction();
FUNCTION_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize function", e);
throw new IllegalArgumentException("failed to deserialize Function", e);
}
}
@SuppressWarnings("unchecked")
protected <T> Supplier<T> deserializeSupplier(final String name, byte[] bytes) {
String className = formatClassName(name, Supplier.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
SupplierFactory<T> factory = ((SupplierFactory<T>) factoryClass.newInstance());
SUPPLIER_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize Supplier", e);
}
}
private String formatClassName(String name, Class<?> type) {
Assert.hasLength(name, "name must not be empty");
String firstLetter = name.substring(0, 1).toUpperCase();
String upperCasedName = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter;
return String.format("%s.%s%sFactory", FunctionCompiler.class.getPackage().getName(), upperCasedName, type.getSimpleName());
}
}

View File

@@ -18,7 +18,9 @@ package org.springframework.cloud.function.registry;
import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.cloud.function.compiler.CompiledFunctionFactory;
import org.springframework.util.Assert;
@@ -29,7 +31,17 @@ import org.springframework.util.FileCopyUtils;
*/
public class FileSystemFunctionRegistry extends AbstractFunctionRegistry {
private final File directory;
private static final String CONSUMER_DIRECTORY = "consumers";
private static final String FUNCTION_DIRECTORY = "functions";
private static final String SUPPLIER_DIRECTORY = "suppliers";
private final File consumerDirectory;
private final File functionDirectory;
private final File supplierDirectory;
public FileSystemFunctionRegistry() {
this(new File("/tmp/function-registry"));
@@ -44,28 +56,80 @@ public class FileSystemFunctionRegistry extends AbstractFunctionRegistry {
Assert.isTrue(directory.isDirectory(),
String.format("%s is not a directory.", directory.getAbsolutePath()));
}
this.directory = directory;
this.consumerDirectory = new File(directory, CONSUMER_DIRECTORY);
this.functionDirectory = new File(directory, FUNCTION_DIRECTORY);
this.supplierDirectory = new File(directory, SUPPLIER_DIRECTORY);
this.consumerDirectory.mkdir();
this.functionDirectory.mkdir();
this.supplierDirectory.mkdir();
}
@Override
public <T, R> Function<T, R> doLookup(String name) {
public <T> Consumer<T> doLookupConsumer(String name) {
try {
byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.directory, fileName(name)));
return this.deserialize(name, bytes);
byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.consumerDirectory, fileName(name)));
return this.deserializeConsumer(name, bytes);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to lookup function: %s", name), e);
throw new IllegalArgumentException(String.format("failed to lookup Consumer: %s", name), e);
}
}
public void register(String name, String function) {
CompiledFunctionFactory<?, ?> factory = this.compile(name, function);
File file = new File(this.directory, fileName(name));
@Override
public <T, R> Function<T, R> doLookupFunction(String name) {
try {
byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.functionDirectory, fileName(name)));
return this.deserializeFunction(name, bytes);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to lookup Function: %s", name), e);
}
}
@Override
public <T> Supplier<T> doLookupSupplier(String name) {
try {
byte[] bytes = FileCopyUtils.copyToByteArray(new File(this.supplierDirectory, fileName(name)));
return this.deserializeSupplier(name, bytes);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to lookup Supplier: %s", name), e);
}
}
@Override
public void registerConsumer(String name, String consumer) {
CompiledFunctionFactory<?> factory = this.compileConsumer(name, consumer);
File file = new File(this.consumerDirectory, fileName(name));
try {
FileCopyUtils.copy(factory.getGeneratedClassBytes(), file);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to register function: %s", name), e);
throw new IllegalArgumentException(String.format("failed to register Consumer: %s", name), e);
}
}
@Override
public void registerFunction(String name, String function) {
CompiledFunctionFactory<?> factory = this.compileFunction(name, function);
File file = new File(this.functionDirectory, fileName(name));
try {
FileCopyUtils.copy(factory.getGeneratedClassBytes(), file);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to register Function: %s", name), e);
}
}
@Override
public void registerSupplier(String name, String supplier) {
CompiledFunctionFactory<?> factory = this.compileSupplier(name, supplier);
File file = new File(this.supplierDirectory, fileName(name));
try {
FileCopyUtils.copy(factory.getGeneratedClassBytes(), file);
}
catch (IOException e) {
throw new IllegalArgumentException(String.format("failed to register Supplier: %s", name), e);
}
}

View File

@@ -16,17 +16,26 @@
package org.springframework.cloud.function.registry;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Mark Fisher
*/
public interface FunctionRegistry {
void register(String name, String function);
void registerConsumer(String name, String consumer);
<T, R> Function<T, R> lookup(String name);
void registerFunction(String name, String function);
<T, R> Function<T, R> compose(String... functionNames);
void registerSupplier(String name, String supplier);
<T> Consumer<T> lookupConsumer(String name);
<T, R> Function<T, R> lookupFunction(String name);
<T, R> Function<T, R> composeFunction(String... functionNames);
<T> Supplier<T> lookupSupplier(String name);
}

View File

@@ -17,23 +17,51 @@
package org.springframework.cloud.function.registry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Mark Fisher
*/
public class InMemoryFunctionRegistry extends AbstractFunctionRegistry {
private final ConcurrentHashMap<String, Function<?, ?>> map = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Consumer<?>> consumerMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Function<?, ?>> functionMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Supplier<?>> supplierMap = new ConcurrentHashMap<>();
@Override
@SuppressWarnings("unchecked")
public Function<?, ?> doLookup(String name) {
return this.map.get(name);
protected Consumer<?> doLookupConsumer(String name) {
return this.consumerMap.get(name);
}
@Override
public void register(String name, String function) {
this.map.put(name, this.compile(name, function).getFunction());
@SuppressWarnings("unchecked")
public Function<?, ?> doLookupFunction(String name) {
return this.functionMap.get(name);
}
@Override
@SuppressWarnings("unchecked")
protected Supplier<?> doLookupSupplier(String name) {
return this.supplierMap.get(name);
}
@Override
public void registerConsumer(String name, String consumer) {
this.consumerMap.put(name, this.compileConsumer(name, consumer).getResult());
}
@Override
public void registerFunction(String name, String function) {
this.functionMap.put(name, this.compileFunction(name, function).getResult());
}
@Override
public void registerSupplier(String name, String supplier) {
this.supplierMap.put(name, this.compileSupplier(name, supplier).getResult());
}
}

View File

@@ -40,6 +40,7 @@ public class LocalFunctionGatewayTests {
@Before
public void init() {
registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())");
this.scheduler.initialize();
}

View File

@@ -43,10 +43,10 @@ public class FileSystemFunctionRegistryTests {
}
@Test
public void registerAndLookup() throws IOException {
public void registerAndLookupFunction() throws IOException {
FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(this.directory);
registry.register("uppercase", "f->f.map(s->s.toString().toUpperCase())");
Function<Flux<String>, Flux<String>> function = registry.lookup("uppercase");
registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())");
Function<Flux<String>, Flux<String>> function = registry.lookupFunction("uppercase");
Flux<String> output = function.apply(Flux.just("foo", "bar"));
List<String> results = output.collectList().block();
assertEquals("FOO", results.get(0));
@@ -54,11 +54,11 @@ public class FileSystemFunctionRegistryTests {
}
@Test
public void compose() throws IOException {
public void composeFunction() throws IOException {
FileSystemFunctionRegistry registry = new FileSystemFunctionRegistry(this.directory);
registry.register("uppercase", "f->f.map(s->s.toString().toUpperCase())");
registry.register("exclaim", "f->f.map(s->s+\"!!!\")");
Function<Flux<String>, Flux<String>> function = registry.compose("uppercase", "exclaim");
registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())");
registry.registerFunction("exclaim", "f->f.map(s->s+\"!!!\")");
Function<Flux<String>, Flux<String>> function = registry.composeFunction("uppercase", "exclaim");
Flux<String> output = function.apply(Flux.just("foo", "bar"));
List<String> results = output.collectList().block();
assertEquals("FOO!!!", results.get(0));