Add support for detecting FunctionRegistration or Function

User can now provide a Function or an ApplicationInitializer. Also
the initializer can create a FunctionRegistration with the handler
name instead of a bean with the handler name. Better control of
input and output types that way.

Fixes gh-231
This commit is contained in:
Dave Syer
2018-11-09 12:35:36 +00:00
parent d1b9a9b3fb
commit aba50816f7
10 changed files with 281 additions and 63 deletions

View File

@@ -16,10 +16,13 @@
package org.springframework.cloud.function.web;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -38,15 +41,29 @@ import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FluxWrapper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.ResponseEntity.BodyBuilder;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.messaging.Message;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -65,11 +82,16 @@ public class RequestProcessor {
private final JsonMapper mapper;
public RequestProcessor(FunctionInspector inspector, ObjectProvider<JsonMapper> mapper,
StringConverter converter) {
private final List<HttpMessageReader<?>> messageReaders;
public RequestProcessor(FunctionInspector inspector,
ObjectProvider<JsonMapper> mapper, StringConverter converter,
ObjectProvider<ServerCodecConfigurer> codecs) {
this.mapper = mapper.getIfAvailable();
this.inspector = inspector;
this.converter = converter;
ServerCodecConfigurer source = codecs.getIfAvailable();
this.messageReaders = source == null ? null : source.getReaders();
}
public static FunctionWrapper wrapper(
@@ -95,6 +117,12 @@ public class RequestProcessor {
}
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper,
ServerWebExchange exchange) {
return Mono.from(body(wrapper.handler(), exchange))
.flatMap(body -> response(wrapper, body, false));
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body,
boolean stream) {
Object function = wrapper.handler();
@@ -102,12 +130,15 @@ public class RequestProcessor {
Type itemType = getItemType(function);
Object input = body;
if (StringUtils.hasText(body) && this.mapper!=null) {
if (StringUtils.hasText(body) && this.mapper != null) {
if (body.startsWith("[")) {
Class<?> collectionType = Collection.class.isAssignableFrom(inputType) ? inputType : Collection.class;
input = mapper.toObject(body, ResolvableType
.forClassWithGenerics(collectionType, (Class<?>) itemType)
.getType());
Class<?> collectionType = Collection.class.isAssignableFrom(inputType)
? inputType
: Collection.class;
input = mapper.toObject(body,
ResolvableType
.forClassWithGenerics(collectionType, (Class<?>) itemType)
.getType());
}
else {
if (inputType == String.class) {
@@ -124,7 +155,7 @@ public class RequestProcessor {
}
}
}
return post(wrapper, input, null, stream);
return response(wrapper, input, stream);
}
public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
@@ -134,19 +165,17 @@ public class RequestProcessor {
return stream(request, result);
}
private Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, Object body,
MultiValueMap<String, String> params, boolean stream) {
private Mono<ResponseEntity<?>> response(FunctionWrapper wrapper, Object body,
boolean stream) {
Iterable<?> iterable = body instanceof Collection ? (Collection<?>) body
: (body instanceof Set ? Collections.singleton(body) : Collections.singletonList(body));
: (body instanceof Set ? Collections.singleton(body)
: Collections.singletonList(body));
Function<Publisher<?>, Publisher<?>> function = wrapper.function();
Consumer<Publisher<?>> consumer = wrapper.consumer();
MultiValueMap<String, String> form = wrapper.params();
if (params != null) {
form.putAll(params);
}
boolean inputIsCollection = Collection.class
.isAssignableFrom(inspector.getInputType(wrapper.handler()));
@@ -250,6 +279,98 @@ public class RequestProcessor {
}
}
private Publisher<?> body(Object handler, ServerWebExchange exchange) {
ResolvableType elementType = ResolvableType
.forClass(this.inspector.getInputType(handler));
ResolvableType actualType = elementType;
Class<?> resolvedType = elementType.resolve();
ReactiveAdapter adapter = (resolvedType != null
? getAdapterRegistry().getAdapter(resolvedType)
: null);
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
MediaType contentType = request.getHeaders().getContentType();
MediaType mediaType = (contentType != null ? contentType
: MediaType.APPLICATION_OCTET_STREAM);
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + (contentType != null
? "Content-Type:" + contentType
: "No Content-Type, using " + MediaType.APPLICATION_OCTET_STREAM));
}
boolean isBodyRequired = (adapter != null && !adapter.supportsEmpty());
MethodParameter bodyParam = new MethodParameter(handlerMethod(handler), 0);
for (HttpMessageReader<?> reader : getMessageReaders()) {
if (reader.canRead(elementType, mediaType)) {
Map<String, Object> readHints = Hints.from(Hints.LOG_PREFIX_HINT,
exchange.getLogPrefix());
if (adapter != null && adapter.isMultiValue()) {
if (logger.isDebugEnabled()) {
logger.debug(
exchange.getLogPrefix() + "0..N [" + elementType + "]");
}
Flux<?> flux = reader.read(actualType, elementType, request, response,
readHints);
flux = flux.onErrorResume(
ex -> Flux.error(handleReadError(bodyParam, ex)));
if (isBodyRequired) {
flux = flux.switchIfEmpty(
Flux.error(() -> handleMissingBody(bodyParam)));
}
return Mono.just(adapter.fromPublisher(flux));
}
else {
// Single-value (with or without reactive type wrapper)
if (logger.isDebugEnabled()) {
logger.debug(
exchange.getLogPrefix() + "0..1 [" + elementType + "]");
}
Mono<?> mono = reader.readMono(actualType, elementType, request,
response, readHints);
mono = mono.onErrorResume(
ex -> Mono.error(handleReadError(bodyParam, ex)));
if (isBodyRequired) {
mono = mono.switchIfEmpty(
Mono.error(() -> handleMissingBody(bodyParam)));
}
return (adapter != null ? Mono.just(adapter.fromPublisher(mono))
: Mono.from(mono));
}
}
}
return Mono.error(new UnsupportedMediaTypeStatusException(mediaType,
Arrays.asList(MediaType.APPLICATION_JSON), elementType));
}
private Method handlerMethod(Object handler) {
return ReflectionUtils.findMethod(handler.getClass(), "apply", (Class<?>[]) null);
}
public List<HttpMessageReader<?>> getMessageReaders() {
return this.messageReaders;
}
private Throwable handleReadError(MethodParameter parameter, Throwable ex) {
return (ex instanceof DecodingException
? new ServerWebInputException("Failed to read HTTP message", parameter,
ex)
: ex);
}
private ServerWebInputException handleMissingBody(MethodParameter param) {
return new ServerWebInputException(
"Request body is missing: " + param.getExecutable().toGenericString());
}
private ReactiveAdapterRegistry getAdapterRegistry() {
return ReactiveAdapterRegistry.getSharedInstance();
}
private Publisher<?> value(Function<Publisher<?>, Publisher<?>> function,
Publisher<String> value) {
Flux<?> input = Flux.from(value).map(body -> converter.convert(function, body));

View File

@@ -83,6 +83,13 @@ public class FunctionController {
return map;
}
@PostMapping(path = "/**", consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> post(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return processor.post(wrapper, request);
}
@PostMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> post(ServerWebExchange request,

View File

@@ -106,7 +106,8 @@ class FunctionEndpointInitializer
context.registerBean(RequestProcessor.class,
() -> new RequestProcessor(context.getBean(FunctionInspector.class),
context.getBeanProvider(JsonMapper.class),
context.getBean(StringConverter.class)));
context.getBean(StringConverter.class),
context.getBeanProvider(ServerCodecConfigurer.class)));
context.registerBean(FunctionEndpointFactory.class,
() -> new FunctionEndpointFactory(context.getBean(FunctionCatalog.class),
context.getBean(FunctionInspector.class),
@@ -226,7 +227,8 @@ class FunctionEndpointFactory {
logger.info("Found functions: " + names);
if (handler != null) {
logger.info("Configured function: " + handler);
Assert.isTrue(names.contains(handler), "Cannot locate function: " + handler);
Assert.isTrue(names.contains(handler),
"Cannot locate function: " + handler);
return catalog.lookup(Function.class, handler);
}
return catalog.lookup(Function.class, names.iterator().next());