GH-293 Enhanced endpoint mapping support for functional form context configuration

- Enhanced HTTP endpoint mapping support for 'functional form' context configuration ensuring it can register multiple endpoint to maintain the same behaviour as with regular application context
- Additional consolidation around Function Catalog
- Added identical test for functional and non-functional form endpoint configuration.

Resolves #293
This commit is contained in:
Oleg Zhurakousky
2019-03-19 07:51:24 +01:00
parent 3b4e9616ae
commit 87a878879c
11 changed files with 418 additions and 243 deletions

View File

@@ -47,6 +47,7 @@ import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -186,6 +187,28 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
this.environment = environment;
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
String functionName = function == null ? null
: this.lookupFunctionName(function);
if (StringUtils.hasText(functionName)) {
FunctionRegistration<?> registration = new FunctionRegistration<Object>(
function, functionName);
FunctionType functionType = this.findType(registration);
return registration.type(functionType.getType());
}
return null;
}
@Override
public <T> void register(FunctionRegistration<T> functionRegistration) {
Assert.notEmpty(functionRegistration.getNames(),
"'registration' must contain at least one name before it is registered in catalog.");
register(functionRegistration, functionRegistration.getNames().iterator().next());
}
/**
* Registers function wrapped by the provided FunctionRegistration with
* this FunctionRegistry.
@@ -236,16 +259,13 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
protected FunctionType findType(FunctionRegistration<?> functionRegistration) {
FunctionType functionType = functionRegistration.getType();
if (functionType != null) {
return functionType;
}
throw new IllegalStateException(
"Unless FunctionType is already available in FunctionRegistration, "
+ "this operation must be overriden "
+ "by the implementation of the FunctionRegistry.");
String name = this.lookupFunctionName(functionRegistration.getTarget());
return functionRegistration.getType() != null
? functionRegistration.getType()
: this.getFunctionType(name);
}
protected void addSupplier(String name, Supplier<?> supplier) {
this.suppliers.put(name, supplier);
}
@@ -327,8 +347,8 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
else if (composedFunction instanceof Supplier) {
this.addSupplier(name, (Supplier<?>) composedFunction);
}
// this.register(composedRegistration);
}
}

View File

@@ -17,14 +17,10 @@
package org.springframework.cloud.function.context.catalog;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.util.Assert;
/**
@@ -34,72 +30,21 @@ import org.springframework.util.Assert;
*/
public class InMemoryFunctionCatalog extends AbstractComposableFunctionRegistry {
private final Map<Object, FunctionRegistration<?>> registrations;
public InMemoryFunctionCatalog() {
this(Collections.emptySet());
}
public InMemoryFunctionCatalog(Set<FunctionRegistration<?>> registrations) {
Assert.notNull(registrations, "'registrations' must not be null");
this.registrations = new HashMap<>();
registrations.stream().forEach(reg -> register(reg));
}
@Override
public FunctionRegistration<?> getRegistration(Object function) {
return this.registrations.get(function);
protected FunctionType findType(FunctionRegistration<?> functionRegistration) {
FunctionType functionType = super.findType(functionRegistration);
if (functionType == null) {
functionType = new FunctionType(functionRegistration.getTarget().getClass());
}
return functionType;
}
@Override
public <T> void register(FunctionRegistration<T> functionRegistration) {
Assert.notEmpty(functionRegistration.getNames(),
"'registration' must contain at least one name before it is registered in catalog.");
// TODO should we just delegate to wrap(..)????
// wrap(functionRegistration, functionRegistration.getNames().iterator().next());
Class<?> type = Object.class;
if (functionRegistration.getTarget() instanceof Function) {
type = Function.class;
}
else if (functionRegistration.getTarget() instanceof Supplier) {
type = Supplier.class;
}
else if (functionRegistration.getTarget() instanceof Consumer) {
type = Consumer.class;
}
FunctionRegistrationEvent event = new FunctionRegistrationEvent(this, type,
functionRegistration.getNames());
this.registrations.put(functionRegistration.getTarget(), functionRegistration);
FunctionRegistration<T> wrapped = functionRegistration.wrap();
if (wrapped != functionRegistration) {
functionRegistration = wrapped;
this.registrations.put(wrapped.getTarget(), wrapped);
if (type == Consumer.class) {
type = Function.class;
}
}
for (String name : functionRegistration.getNames()) {
addType(name, functionRegistration.getType());
addName(functionRegistration.getTarget(), name);
if (functionRegistration.getTarget() instanceof Function) {
this.addFunction(name, (Function<?, ?>) functionRegistration.getTarget());
}
else if (functionRegistration.getTarget() instanceof Consumer) {
this.addConsumer(name, (Consumer<?>) functionRegistration.getTarget());
}
else {
this.addSupplier(name, (Supplier<?>) functionRegistration.getTarget());
}
}
this.publishEvent(event);
}
private void publishEvent(Object event) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(event);
}
}
}

View File

@@ -62,8 +62,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.type.StandardMethodMetadata;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* @author Dave Syer
@@ -93,26 +91,6 @@ public class ContextFunctionCatalogAutoConfiguration {
private ConfigurableListableBeanFactory beanFactory;
@Override
public FunctionRegistration<?> getRegistration(Object function) {
String functionName = function == null ? null
: this.lookupFunctionName(function);
if (StringUtils.hasText(functionName)) {
FunctionRegistration<?> registration = new FunctionRegistration<Object>(
function, functionName);
FunctionType functionType = this.findType(registration);
return registration.type(functionType.getType());
}
return null;
}
@Override
public <T> void register(FunctionRegistration<T> functionRegistration) {
Assert.notEmpty(functionRegistration.getNames(),
"'registration' must contain at least one name before it is registered in catalog.");
register(functionRegistration, functionRegistration.getNames().iterator().next());
}
/**
* Will collect all suppliers, functions, consumers and function registration as
* late as possible in the lifecycle.
@@ -160,13 +138,9 @@ public class ContextFunctionCatalogAutoConfiguration {
@Override
protected FunctionType findType(FunctionRegistration<?> functionRegistration) {
if (functionRegistration.getType() != null) {
return functionRegistration.getType();
}
String name = this.lookupFunctionName(functionRegistration.getTarget());
FunctionType functionType = this.getFunctionType(name);
FunctionType functionType = super.findType(functionRegistration);
if (functionType == null) {
String name = this.lookupFunctionName(functionRegistration.getTarget());
functionType = functionByNameExist(name)
? new FunctionType(functionRegistration.getTarget().getClass())
: new FunctionType(

View File

@@ -44,7 +44,7 @@ public class InMemoryFunctionCatalogTests {
InMemoryFunctionCatalog catalog = new InMemoryFunctionCatalog();
catalog.register(registration);
FunctionRegistration<?> registration2 = catalog.getRegistration(function);
assertThat(registration2).isSameAs(registration);
assertThat(registration2.getType()).isEqualTo(registration.getType());
}
@Test

View File

@@ -50,7 +50,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
* @author Oleg Zhurakousky
*/
public class ContextFunctionCatalogInitializerTests {
@@ -163,7 +163,7 @@ public class ContextFunctionCatalogInitializerTests {
assertThat(bean).isNotSameAs(function);
assertThat(this.inspector.getRegistration(function)).isNotNull();
assertThat(this.inspector.getRegistration(function).getType()).isEqualTo(
FunctionType.from(String.class).to(String.class).wrap(Flux.class));
FunctionType.from(String.class).to(String.class));
}
@Test

View File

@@ -22,6 +22,7 @@ import org.junit.Test;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.catalog.InMemoryFunctionCatalog;
import org.springframework.cloud.function.core.FluxFunction;
import static org.assertj.core.api.Assertions.assertThat;
@@ -38,7 +39,7 @@ public class SingleEntryFunctionRegistryTests {
this.delegate.register(new FunctionRegistration<Foos>(new Foos(), "foo"));
SingleEntryFunctionRegistry registry = new SingleEntryFunctionRegistry(
this.delegate, "foo");
assertThat((Foos) registry.lookup("foo")).isInstanceOf(Function.class);
assertThat(((FluxFunction<?, ?>) registry.lookup("")).getTarget()).isInstanceOf(Foos.class);
}
@Test
@@ -46,7 +47,7 @@ public class SingleEntryFunctionRegistryTests {
this.delegate.register(new FunctionRegistration<Foos>(new Foos(), "foo"));
SingleEntryFunctionRegistry registry = new SingleEntryFunctionRegistry(
this.delegate, "foo");
assertThat((Foos) registry.lookup("bar")).isNull();
assertThat(((FluxFunction<?, ?>) registry.lookup("")).getTarget()).isInstanceOf(Foos.class);
}
@Test
@@ -54,7 +55,7 @@ public class SingleEntryFunctionRegistryTests {
this.delegate.register(new FunctionRegistration<Foos>(new Foos(), ""));
SingleEntryFunctionRegistry registry = new SingleEntryFunctionRegistry(
this.delegate, "");
assertThat((Foos) registry.lookup("")).isInstanceOf(Function.class);
assertThat(((FluxFunction<?, ?>) registry.lookup("")).getTarget()).isInstanceOf(Foos.class);
}
@Test
@@ -62,7 +63,7 @@ public class SingleEntryFunctionRegistryTests {
this.delegate.register(new FunctionRegistration<Foos>(new Foos(), "bar"));
SingleEntryFunctionRegistry registry = new SingleEntryFunctionRegistry(
this.delegate, "foo");
assertThat((Foos) registry.lookup("")).isInstanceOf(Function.class);
assertThat(((FluxFunction<?, ?>) registry.lookup("")).getTarget()).isInstanceOf(Foos.class);
}
class Foos implements Function<String, String> {

View File

@@ -16,11 +16,7 @@
package org.springframework.cloud.function.web.flux;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.InitializingBean;
@@ -29,8 +25,8 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.util.StringUtils;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping;
@@ -38,7 +34,7 @@ import org.springframework.web.server.ServerWebExchange;
/**
* @author Dave Syer
*
* @author Oleg Zhurakousky
*/
@Configuration
@ConditionalOnClass(RequestMappingHandlerMapping.class)
@@ -70,10 +66,6 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
}
}
@Override
protected void initHandlerMethods() {
}
@Override
public Mono<HandlerMethod> getHandlerInternal(ServerWebExchange request) {
String path = request.getRequest().getPath().pathWithinApplication().value();
@@ -87,10 +79,9 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
if (path.startsWith(this.prefix)) {
path = path.substring(this.prefix.length());
}
Object function = findFunctionForGet(request, path);
if (function == null) {
function = findFunctionForPost(request, path);
}
Object function = FunctionWebUtils
.findFunction(request.getRequest().getMethod(), this.functions, request.getAttributes(), path);
if (function != null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Found function for POST: " + path);
@@ -101,61 +92,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
return handler.filter(method -> actual != null);
}
private Object findFunctionForPost(ServerWebExchange request, String path) {
if (!request.getRequest().getMethod().equals(HttpMethod.POST)) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = this.functions.lookup(Consumer.class, path);
if (consumer != null) {
request.getAttributes().put(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = this.functions.lookup(Function.class, path);
if (function != null) {
request.getAttributes().put(WebRequestConstants.FUNCTION, function);
return function;
}
return null;
@Override
protected void initHandlerMethods() {
}
private Object findFunctionForGet(ServerWebExchange request, String path) {
if (!request.getRequest().getMethod().equals(HttpMethod.GET)) {
return null;
}
path = path.startsWith("/") ? path.substring(1) : path;
Object functionForGet = null;
Supplier<Publisher<?>> supplier = this.functions.lookup(Supplier.class, path);
if (supplier != null) {
request.getAttributes().put(WebRequestConstants.SUPPLIER, supplier);
functionForGet = supplier;
}
else {
StringBuilder builder = new StringBuilder();
String name = path;
String[] splitPath = path.split("/");
Function<Object, Object> function = null;
for (int i = 0; i < splitPath.length || function != null; i++) {
String element = splitPath[i];
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
function = this.functions.lookup(Function.class, name);
if (function != null) {
request.getAttributes().put(WebRequestConstants.FUNCTION, function);
String value = path.length() > name.length()
? path.substring(name.length() + 1) : null;
request.getAttributes().put(WebRequestConstants.ARGUMENT, value);
functionForGet = function;
}
}
}
return functionForGet;
}
}

View File

@@ -43,6 +43,7 @@ import org.springframework.cloud.function.web.BasicStringConverter;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.StringConverter;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ApplicationEvent;
@@ -59,6 +60,7 @@ import org.springframework.util.ClassUtils;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.WebExceptionHandler;
import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@@ -69,70 +71,58 @@ import static org.springframework.web.reactive.function.server.ServerResponse.st
/**
* @author Dave Syer
* @author Oleg Zhurakousky
* @since 2.0
*
*/
class FunctionEndpointInitializer
implements ApplicationContextInitializer<GenericApplicationContext> {
class FunctionEndpointInitializer implements ApplicationContextInitializer<GenericApplicationContext> {
@Override
public void initialize(GenericApplicationContext context) {
if (ContextFunctionCatalogInitializer.enabled
&& context.getEnvironment().getProperty(
FunctionalSpringApplication.SPRING_WEB_APPLICATION_TYPE,
WebApplicationType.class,
WebApplicationType.REACTIVE) == WebApplicationType.REACTIVE
&& context.getEnvironment().getProperty("spring.functional.enabled",
Boolean.class, false)
&& ClassUtils.isPresent(
"org.springframework.http.server.reactive.HttpHandler", null)) {
&& context.getEnvironment().getProperty(FunctionalSpringApplication.SPRING_WEB_APPLICATION_TYPE,
WebApplicationType.class, WebApplicationType.REACTIVE) == WebApplicationType.REACTIVE
&& context.getEnvironment().getProperty("spring.functional.enabled", Boolean.class, false)
&& ClassUtils.isPresent("org.springframework.http.server.reactive.HttpHandler", null)) {
registerEndpoint(context);
registerWebFluxAutoConfiguration(context);
}
}
private void registerWebFluxAutoConfiguration(GenericApplicationContext context) {
context.registerBean(DefaultErrorWebExceptionHandler.class,
() -> errorHandler(context));
context.registerBean(WebHttpHandlerBuilder.WEB_HANDLER_BEAN_NAME,
HttpWebHandlerAdapter.class, () -> httpHandler(context));
context.registerBean(DefaultErrorWebExceptionHandler.class, () -> errorHandler(context));
context.registerBean(WebHttpHandlerBuilder.WEB_HANDLER_BEAN_NAME, HttpWebHandlerAdapter.class,
() -> httpHandler(context));
context.addApplicationListener(new ServerListener(context));
}
private void registerEndpoint(GenericApplicationContext context) {
context.registerBean(StringConverter.class,
() -> new BasicStringConverter(context.getBean(FunctionInspector.class),
context.getBeanFactory()));
() -> new BasicStringConverter(context.getBean(FunctionInspector.class), context.getBeanFactory()));
context.registerBean(RequestProcessor.class,
() -> new RequestProcessor(context.getBean(FunctionInspector.class),
context.getBeanProvider(JsonMapper.class),
context.getBean(StringConverter.class),
context.getBeanProvider(JsonMapper.class), context.getBean(StringConverter.class),
context.getBeanProvider(ServerCodecConfigurer.class)));
context.registerBean(FunctionEndpointFactory.class,
() -> new FunctionEndpointFactory(context.getBean(FunctionCatalog.class),
context.getBean(FunctionInspector.class),
context.getBean(RequestProcessor.class),
context.getBean(FunctionInspector.class), context.getBean(RequestProcessor.class),
context.getEnvironment()));
context.registerBean(RouterFunction.class,
() -> context.getBean(FunctionEndpointFactory.class).functionEndpoints());
}
private HttpWebHandlerAdapter httpHandler(GenericApplicationContext context) {
return (HttpWebHandlerAdapter) RouterFunctions.toHttpHandler(
context.getBean(RouterFunction.class),
HandlerStrategies.empty()
.exceptionHandler(context.getBean(WebExceptionHandler.class))
return (HttpWebHandlerAdapter) RouterFunctions.toHttpHandler(context.getBean(RouterFunction.class),
HandlerStrategies.empty().exceptionHandler(context.getBean(WebExceptionHandler.class))
.codecs(config -> config.registerDefaults(true)).build());
}
private DefaultErrorWebExceptionHandler errorHandler(
GenericApplicationContext context) {
private DefaultErrorWebExceptionHandler errorHandler(GenericApplicationContext context) {
context.registerBean(ErrorAttributes.class, () -> new DefaultErrorAttributes());
context.registerBean(ErrorProperties.class, () -> new ErrorProperties());
context.registerBean(ResourceProperties.class, () -> new ResourceProperties());
DefaultErrorWebExceptionHandler handler = new DefaultErrorWebExceptionHandler(
context.getBean(ErrorAttributes.class),
context.getBean(ResourceProperties.class),
context.getBean(ErrorAttributes.class), context.getBean(ResourceProperties.class),
context.getBean(ErrorProperties.class), context);
ServerCodecConfigurer codecs = ServerCodecConfigurer.create();
handler.setMessageWriters(codecs.getWriters());
@@ -152,28 +142,22 @@ class FunctionEndpointInitializer
@Override
public void onApplicationEvent(ApplicationEvent event) {
ApplicationContext context = ((ContextRefreshedEvent) event)
.getApplicationContext();
ApplicationContext context = ((ContextRefreshedEvent) event).getApplicationContext();
if (context != this.context) {
return;
}
if (!ClassUtils.isPresent(
"org.springframework.http.server.reactive.HttpHandler", null)) {
if (!ClassUtils.isPresent("org.springframework.http.server.reactive.HttpHandler", null)) {
logger.info("No web server classes found so no server to start");
return;
}
Integer port = Integer.valueOf(context.getEnvironment()
.resolvePlaceholders("${server.port:${PORT:8080}}"));
String address = context.getEnvironment()
.resolvePlaceholders("${server.address:0.0.0.0}");
Integer port = Integer.valueOf(context.getEnvironment().resolvePlaceholders("${server.port:${PORT:8080}}"));
String address = context.getEnvironment().resolvePlaceholders("${server.address:0.0.0.0}");
if (port >= 0) {
HttpHandler handler = context.getBean(HttpHandler.class);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(
handler);
HttpServer httpServer = HttpServer.create().host(address).port(port)
.handle(adapter);
Thread thread = new Thread(() -> httpServer
.bindUntilJavaShutdown(Duration.ofSeconds(60), this::callback),
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer httpServer = HttpServer.create().host(address).port(port).handle(adapter);
Thread thread = new Thread(
() -> httpServer.bindUntilJavaShutdown(Duration.ofSeconds(60), this::callback),
"server-startup");
thread.setDaemon(false);
thread.start();
@@ -181,7 +165,7 @@ class FunctionEndpointInitializer
}
private void callback(DisposableServer server) {
logger.info("Server started");
logger.info("HTTP server started on port: " + server.port());
try {
double uptime = ManagementFactory.getRuntimeMXBean().getUptime();
logger.info("JVM running for " + uptime + "ms");
@@ -204,48 +188,54 @@ class FunctionEndpointFactory {
private static Log logger = LogFactory.getLog(FunctionEndpointFactory.class);
private Function<Flux<?>, Flux<?>> function;
private final FunctionCatalog functionCatalog;
private FunctionInspector inspector;
private final String handler;
private RequestProcessor processor;
private final FunctionInspector inspector;
FunctionEndpointFactory(FunctionCatalog catalog, FunctionInspector inspector,
RequestProcessor processor, Environment environment) {
private final RequestProcessor processor;
FunctionEndpointFactory(FunctionCatalog functionCatalog, FunctionInspector inspector, RequestProcessor processor,
Environment environment) {
String handler = environment.resolvePlaceholders("${function.handler}");
if (handler.startsWith("$")) {
handler = null;
}
this.processor = processor;
this.inspector = inspector;
this.function = extract(catalog, handler);
this.functionCatalog = functionCatalog;
this.handler = handler;
}
private Function<Flux<?>, Flux<?>> extract(FunctionCatalog catalog, String handler) {
Set<String> names = catalog.getNames(Function.class);
if (!names.isEmpty()) {
logger.info("Found functions: " + names);
if (handler != null) {
logger.info("Configured function: " + handler);
Assert.isTrue(names.contains(handler),
"Cannot locate function: " + handler);
return catalog.lookup(Function.class, handler);
}
return catalog.lookup(Function.class, names.iterator().next());
@SuppressWarnings("unchecked")
private Function<Flux<?>, Flux<?>> extract(ServerRequest request) {
Function<Flux<?>, Flux<?>> function;
if (handler != null) {
logger.info("Configured function: " + handler);
Set<String> names = this.functionCatalog.getNames(Function.class);
Assert.isTrue(names.contains(handler), "Cannot locate function: " + handler);
function = this.functionCatalog.lookup(Function.class, handler);
}
throw new IllegalStateException("No function defined");
else {
function = (Function<Flux<?>, Flux<?>>) FunctionWebUtils.findFunction(request.method(), functionCatalog,
request.attributes(), request.path());
}
return function;
}
@SuppressWarnings({ "unchecked" })
public <T> RouterFunction<?> functionEndpoints() {
return route(POST("/"), request -> {
Class<T> outputType = (Class<T>) this.inspector.getOutputType(this.function);
FunctionWrapper wrapper = RequestProcessor.wrapper(this.function, null, null);
return route(POST("/**"), request -> {
Function<Flux<?>, Flux<?>> function = extract(request);
Class<T> outputType = (Class<T>) this.inspector.getOutputType(function);
FunctionWrapper wrapper = RequestProcessor.wrapper(function, null, null);
Mono<ResponseEntity<?>> stream = request.bodyToMono(String.class)
.flatMap(content -> this.processor.post(wrapper, content, false));
return stream.flatMap(entity -> status(entity.getStatusCode())
.headers(headers -> headers.addAll(entity.getHeaders()))
.body(Mono.just((T) entity.getBody()), outputType));
return stream.flatMap(entity -> {
return status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders()))
.body(Mono.just((T) entity.getBody()), outputType);
});
});
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.util;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.http.HttpMethod;
public final class FunctionWebUtils {
private FunctionWebUtils() {
}
public static Object findFunction(HttpMethod method, FunctionCatalog functionCatalog, Map<String, Object> attributes, String path) {
if (method.equals(HttpMethod.GET)) {
return findFunctionForGet(functionCatalog, attributes, path);
}
else if (method.equals(HttpMethod.POST)) {
return findFunctionForPost(functionCatalog, attributes, path);
}
else {
throw new IllegalStateException("HTTP method '" + method + "' is not supported;");
}
}
private static Object findFunctionForGet(FunctionCatalog functionCatalog, Map<String, Object> attributes, String path) {
path = path.startsWith("/") ? path.substring(1) : path;
Object functionForGet = null;
Supplier<Publisher<?>> supplier = functionCatalog.lookup(Supplier.class, path);
if (supplier != null) {
attributes.put(WebRequestConstants.SUPPLIER, supplier);
functionForGet = supplier;
}
else {
StringBuilder builder = new StringBuilder();
String name = path;
String[] splitPath = path.split("/");
Function<Object, Object> function = null;
for (int i = 0; i < splitPath.length || function != null; i++) {
String element = splitPath[i];
if (builder.length() > 0) {
builder.append("/");
}
builder.append(element);
name = builder.toString();
function = functionCatalog.lookup(Function.class, name);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
String value = path.length() > name.length()
? path.substring(name.length() + 1) : null;
attributes.put(WebRequestConstants.ARGUMENT, value);
functionForGet = function;
}
}
}
return functionForGet;
}
private static Object findFunctionForPost(FunctionCatalog functionCatalog, Map<String, Object> attributes, String path) {
path = path.startsWith("/") ? path.substring(1) : path;
Consumer<Publisher<?>> consumer = functionCatalog.lookup(Consumer.class, path);
if (consumer != null) {
attributes.put(WebRequestConstants.CONSUMER, consumer);
return consumer;
}
Function<Object, Object> function = functionCatalog.lookup(Function.class, path);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
return function;
}
return null;
}
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.function;
import java.net.URI;
import java.util.function.Function;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.http.ResponseEntity;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
public class FunctionEndpointInitializerMVCTests {
@Before
public void init() throws Exception {
String port = "" + SocketUtils.findAvailableTcpPort();
System.setProperty("server.port", port);
}
@After
public void close() throws Exception {
System.clearProperty("server.port");
}
@Test
public void testSingleFunctionMapping() throws Exception {
SpringApplication.run(ApplicationConfiguration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
ResponseEntity<String> response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/uppercase"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("STRESSED");
response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/reverse"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("desserts");
}
@Test
public void testCompositionFunctionMapping() throws Exception {
SpringApplication.run(ApplicationConfiguration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
ResponseEntity<String> response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/uppercase,lowercase,reverse"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("desserts");
}
@SpringBootApplication
protected static class ApplicationConfiguration {
@Bean
public Function<String, String> uppercase() {
return s -> s.toUpperCase();
}
@Bean
public Function<String, String> lowercase() {
return s -> s.toLowerCase();
}
@Bean
public Function<String, String> reverse() {
return s -> new StringBuilder(s).reverse().toString();
}
// @Override
// public void initialize(GenericApplicationContext applicationContext) {
// applicationContext.registerBean("uppercase", FunctionRegistration.class,
// () -> new FunctionRegistration<>(uppercase())
// .type(FunctionType.from(String.class).to(String.class)));
// applicationContext.registerBean("reverse", FunctionRegistration.class,
// () -> new FunctionRegistration<>(reverse())
// .type(FunctionType.from(String.class).to(String.class)));
// applicationContext.registerBean("lowercase", FunctionRegistration.class,
// () -> new FunctionRegistration<>(lowercase())
// .type(FunctionType.from(String.class).to(String.class)));
// }
}
}

View File

@@ -0,0 +1,106 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.web.function;
import java.net.URI;
import java.util.function.Function;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.FunctionalSpringApplication;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.http.ResponseEntity;
import org.springframework.util.SocketUtils;
import static org.assertj.core.api.Assertions.assertThat;
public class FunctionEndpointInitializerTests {
@Before
public void init() throws Exception {
String port = "" + SocketUtils.findAvailableTcpPort();
System.setProperty("server.port", port);
}
@After
public void close() throws Exception {
System.clearProperty("server.port");
}
@Test
public void testSingleFunctionMapping() throws Exception {
FunctionalSpringApplication.run(ApplicationConfiguration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
Thread.sleep(200);
ResponseEntity<String> response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/uppercase"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("STRESSED");
response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/reverse"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("desserts");
}
@Test
public void testCompositionFunctionMapping() throws Exception {
FunctionalSpringApplication.run(ApplicationConfiguration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
Thread.sleep(200);
ResponseEntity<String> response = testRestTemplate.postForEntity(new URI("http://localhost:" + port + "/uppercase,lowercase,reverse"), "stressed", String.class);
assertThat(response.getBody()).isEqualTo("desserts");
}
@SpringBootConfiguration
protected static class ApplicationConfiguration
implements ApplicationContextInitializer<GenericApplicationContext> {
public Function<String, String> uppercase() {
return s -> s.toUpperCase();
}
public Function<String, String> lowercase() {
return s -> s.toLowerCase();
}
public Function<String, String> reverse() {
return s -> new StringBuilder(s).reverse().toString();
}
@Override
public void initialize(GenericApplicationContext applicationContext) {
applicationContext.registerBean("uppercase", FunctionRegistration.class,
() -> new FunctionRegistration<>(uppercase())
.type(FunctionType.from(String.class).to(String.class)));
applicationContext.registerBean("reverse", FunctionRegistration.class,
() -> new FunctionRegistration<>(reverse())
.type(FunctionType.from(String.class).to(String.class)));
applicationContext.registerBean("lowercase", FunctionRegistration.class,
() -> new FunctionRegistration<>(lowercase())
.type(FunctionType.from(String.class).to(String.class)));
}
}
}