removed FunctionRegistry

This commit is contained in:
markfisher
2017-01-11 21:23:02 -05:00
parent cfd416590d
commit 0fb31d6d2b
12 changed files with 51 additions and 277 deletions

View File

@@ -40,6 +40,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -14,7 +14,7 @@
* limitations under the License.
*/
package org.springframework.cloud.function.support;
package org.springframework.cloud.function.compiler;
import java.io.InputStreamReader;
import java.util.function.Function;

View File

@@ -22,15 +22,20 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-compiler</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>

View File

@@ -25,7 +25,7 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.cloud.function.invoker.FunctionInvokingRunnable;
import org.springframework.cloud.function.registry.FunctionRegistry;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.util.Assert;
@@ -35,32 +35,32 @@ import org.springframework.util.Assert;
*/
public class LocalFunctionGateway implements FunctionGateway {
private final FunctionRegistry registry;
private final FunctionCatalog catalog;
private final TaskScheduler scheduler;
public LocalFunctionGateway(FunctionRegistry registry, TaskScheduler scheduler) {
Assert.notNull(registry, "FunctionRegistry must not be null");
public LocalFunctionGateway(FunctionCatalog catalog, TaskScheduler scheduler) {
Assert.notNull(catalog, "FunctionCatalog must not be null");
Assert.notNull(scheduler, "TaskScheduler must not be null");
this.registry = registry;
this.catalog = catalog;
this.scheduler = scheduler;
}
@Override
public <T, R> R invoke(String functionName, T request) {
Function<T, R> function = this.registry.lookupFunction(functionName);
Function<T, R> function = this.catalog.lookupFunction(functionName);
return function.apply(request);
}
@Override
public <T, R> void schedule(String functionName, Trigger trigger, Supplier<T> supplier, Consumer<R> consumer) {
Function<T, R> function = this.registry.lookupFunction(functionName);
Function<T, R> function = this.catalog.lookupFunction(functionName);
this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger);
}
@Override
public <T, R> void subscribe(Publisher<T> publisher, String functionName, final Consumer<R> consumer) {
final Function<T, R> function = this.registry.lookupFunction(functionName);
final Function<T, R> function = this.catalog.lookupFunction(functionName);
publisher.subscribe(new Subscriber<T>() {
@Override

View File

@@ -1,150 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.registry;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.cloud.function.compiler.CompiledFunctionFactory;
import org.springframework.cloud.function.compiler.ConsumerCompiler;
import org.springframework.cloud.function.compiler.ConsumerFactory;
import org.springframework.cloud.function.compiler.FunctionCompiler;
import org.springframework.cloud.function.compiler.FunctionFactory;
import org.springframework.cloud.function.compiler.SupplierCompiler;
import org.springframework.cloud.function.compiler.SupplierFactory;
import org.springframework.cloud.function.compiler.java.SimpleClassLoader;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
*/
public abstract class AbstractFunctionRegistry implements FunctionRegistry {
private static final Map<String, ConsumerFactory<?>> CONSUMER_FACTORY_CACHE = new HashMap<>();
private static final Map<String, FunctionFactory<?, ?>> FUNCTION_FACTORY_CACHE = new HashMap<>();
private static final Map<String, SupplierFactory<?>> SUPPLIER_FACTORY_CACHE = new HashMap<>();
private final ConsumerCompiler consumerCompiler = new ConsumerCompiler<>();
private final FunctionCompiler functionCompiler = new FunctionCompiler<>();
private final SupplierCompiler supplierCompiler = new SupplierCompiler<>();
private final SimpleClassLoader classLoader = new SimpleClassLoader(AbstractFunctionRegistry.class.getClassLoader());
@Override
public <T> Consumer<T> lookupConsumer(String name) {
@SuppressWarnings("unchecked")
ConsumerFactory<T> factory = (ConsumerFactory<T>) CONSUMER_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getResult();
}
return this.doLookupConsumer(name);
}
@Override
public final <T, R> Function<T, R> lookupFunction(String name) {
@SuppressWarnings("unchecked")
FunctionFactory<T, R> factory = (FunctionFactory<T, R>) FUNCTION_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getResult();
}
return this.doLookupFunction(name);
}
@Override
public <T> Supplier<T> lookupSupplier(String name) {
@SuppressWarnings("unchecked")
SupplierFactory<T> factory = (SupplierFactory<T>) SUPPLIER_FACTORY_CACHE.get(name);
if (factory != null) {
return factory.getResult();
}
return this.doLookupSupplier(name);
}
protected abstract <T> Consumer<T> doLookupConsumer(String name);
protected abstract <T, R> Function<T, R> doLookupFunction(String name);
protected abstract <T> Supplier<T> doLookupSupplier(String name);
protected <T> CompiledFunctionFactory<Consumer<T>> compileConsumer(String name, String code) {
return this.consumerCompiler.compile(name, code);
}
protected <T, R> CompiledFunctionFactory<Function<T, R>> compileFunction(String name, String code) {
return this.functionCompiler.compile(name, code);
}
protected <T> CompiledFunctionFactory<Supplier<T>> compileSupplier(String name, String code) {
return this.supplierCompiler.compile(name, code);
}
@SuppressWarnings("unchecked")
protected <T> Consumer<T> deserializeConsumer(final String name, byte[] bytes) {
String className = formatClassName(name, Consumer.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
ConsumerFactory<T> factory = ((ConsumerFactory<T>) factoryClass.newInstance());
CONSUMER_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize Consumer", e);
}
}
@SuppressWarnings("unchecked")
protected <T, R> Function<T, R> deserializeFunction(final String name, byte[] bytes) {
String className = formatClassName(name, Function.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
FunctionFactory<T, R> factory = ((FunctionFactory<T, R>) factoryClass.newInstance());
FUNCTION_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize Function", e);
}
}
@SuppressWarnings("unchecked")
protected <T> Supplier<T> deserializeSupplier(final String name, byte[] bytes) {
String className = formatClassName(name, Supplier.class);
Class<?> factoryClass = this.classLoader.defineClass(className, bytes);
try {
SupplierFactory<T> factory = ((SupplierFactory<T>) factoryClass.newInstance());
SUPPLIER_FACTORY_CACHE.put(name, factory);
return factory.getResult();
}
catch (InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("failed to deserialize Supplier", e);
}
}
private String formatClassName(String name, Class<?> type) {
Assert.hasLength(name, "name must not be empty");
String firstLetter = name.substring(0, 1).toUpperCase();
String upperCasedName = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter;
return String.format("%s.%s%sFactory", FunctionCompiler.class.getPackage().getName(), upperCasedName, type.getSimpleName());
}
}

View File

@@ -1,30 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.registry;
/**
* @author Mark Fisher
*/
public interface FunctionRegistry extends FunctionCatalog {
void registerConsumer(String name, String consumer);
void registerFunction(String name, String function);
void registerSupplier(String name, String supplier);
}

View File

@@ -1,67 +0,0 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.registry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author Mark Fisher
*/
public class InMemoryFunctionRegistry extends AbstractFunctionRegistry {
private final ConcurrentHashMap<String, Consumer<?>> consumerMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Function<?, ?>> functionMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Supplier<?>> supplierMap = new ConcurrentHashMap<>();
@Override
@SuppressWarnings("unchecked")
protected Consumer<?> doLookupConsumer(String name) {
return this.consumerMap.get(name);
}
@Override
@SuppressWarnings("unchecked")
public Function<?, ?> doLookupFunction(String name) {
return this.functionMap.get(name);
}
@Override
@SuppressWarnings("unchecked")
protected Supplier<?> doLookupSupplier(String name) {
return this.supplierMap.get(name);
}
@Override
public void registerConsumer(String name, String consumer) {
this.consumerMap.put(name, this.compileConsumer(name, consumer).getResult());
}
@Override
public void registerFunction(String name, String function) {
this.functionMap.put(name, this.compileFunction(name, function).getResult());
}
@Override
public void registerSupplier(String name, String supplier) {
this.supplierMap.put(name, this.compileSupplier(name, supplier).getResult());
}
}

View File

@@ -19,12 +19,14 @@ package org.springframework.cloud.function.gateway;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.springframework.cloud.function.registry.FunctionRegistry;
import org.springframework.cloud.function.registry.InMemoryFunctionRegistry;
import org.springframework.cloud.function.registry.FunctionCatalog;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import reactor.core.publisher.Flux;
@@ -34,19 +36,35 @@ import reactor.core.publisher.Flux;
*/
public class LocalFunctionGatewayTests {
private final FunctionRegistry registry = new InMemoryFunctionRegistry();
private final FunctionCatalog catalog = new FunctionCatalog() {
@Override
public <T> Supplier<T> lookupSupplier(String name) {
return null;
}
@Override
@SuppressWarnings("unchecked")
public Function<Flux<String>, Flux<String>> lookupFunction(String name) {
return ("uppercase".equals(name) ? f->f.map(s->s.toString().toUpperCase()) : null);
}
@Override
public <T> Consumer<T> lookupConsumer(String name) {
return null;
}
};
private final ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
@Before
public void init() {
registry.registerFunction("uppercase", "f->f.map(s->s.toString().toUpperCase())");
this.scheduler.initialize();
}
@Test
public void test() {
LocalFunctionGateway gateway = new LocalFunctionGateway(registry, scheduler);
LocalFunctionGateway gateway = new LocalFunctionGateway(catalog, scheduler);
Flux<String> output = gateway.invoke("uppercase", Flux.just("foo", "bar"));
List<String> results = output.collectList().block();
assertEquals("FOO", results.get(0));

View File

@@ -31,7 +31,7 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>
<artifactId>spring-cloud-function-compiler</artifactId>
<version>${spring-cloud-function.version}</version>
</dependency>
<dependency>

View File

@@ -21,7 +21,7 @@ import java.util.function.Supplier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.compiler.FunctionCompiler;
import org.springframework.cloud.function.support.LambdaCompilingFunction;
import org.springframework.cloud.function.compiler.LambdaCompilingFunction;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ByteArrayResource;

View File

@@ -23,11 +23,6 @@
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-context</artifactId>

View File

@@ -23,10 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.function.Function;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.apache.commons.io.Charsets;
import org.reactivestreams.Publisher;
@@ -44,6 +40,9 @@ import org.springframework.util.MimeType;
import org.springframework.util.StreamUtils;
import org.springframework.web.reactive.config.WebReactiveConfigurer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import reactor.core.publisher.Flux;
/**