Initial refactoring and simplificatioin of web module

This commit is contained in:
Oleg Zhurakousky
2020-11-02 14:09:38 +01:00
parent 9b1206f6af
commit 9a715be835
15 changed files with 231 additions and 394 deletions

View File

@@ -16,19 +16,13 @@
package org.springframework.cloud.function.web;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
@@ -45,33 +39,18 @@ import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxedConsumer;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.DecodingException;
import org.springframework.core.codec.Hints;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.ResponseEntity.BodyBuilder;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.messaging.Message;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.ServerWebInputException;
import org.springframework.web.server.UnsupportedMediaTypeStatusException;
/**
* @author Dave Syer
@@ -84,102 +63,125 @@ public class RequestProcessor {
private final FunctionCatalog functionCatalog;
private final StringConverter converter;
private final JsonMapper mapper;
private final List<HttpMessageReader<?>> messageReaders;
public RequestProcessor(FunctionCatalog functionCatalog,
ObjectProvider<JsonMapper> mapper, StringConverter converter,
ObjectProvider<JsonMapper> mapper,
ObjectProvider<ServerCodecConfigurer> codecs) {
this.mapper = mapper.getIfAvailable();
this.functionCatalog = functionCatalog;
this.converter = converter;
ServerCodecConfigurer source = codecs.getIfAvailable();
this.messageReaders = source == null ? null : source.getReaders();
}
public static FunctionWrapper wrapper(
Function<? extends Publisher<?>, ? extends Publisher<?>> function,
Consumer<? extends Publisher<?>> consumer,
Supplier<? extends Publisher<?>> supplier) {
return new FunctionWrapper(function, supplier);
}
public static FunctionWrapper wrapper(
Function<? extends Publisher<?>, ? extends Publisher<?>> function) {
return new FunctionWrapper(function, null);
public static FunctionWrapper wrapper(FunctionInvocationWrapper function) {
return new FunctionWrapper(function);
}
@SuppressWarnings("rawtypes")
public Mono<ResponseEntity<?>> get(FunctionWrapper wrapper) {
if (wrapper.function() != null) {
if (wrapper.function().isFunction()) {
return response(wrapper, wrapper.function(), value(wrapper), true, true);
}
else {
Object result = wrapper.supplier().get();
return response(wrapper, wrapper.supplier(), result instanceof Publisher ? (Publisher) result : Flux.just(result), null,
FunctionInvocationWrapper function = (wrapper.function);
Object result = FunctionWebUtils.invokeFunction(function, null, false);
return response(wrapper, wrapper.function(), result instanceof Publisher ? (Publisher) result : Flux.just(result), null,
true);
}
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper,
ServerWebExchange exchange) {
Mono<ResponseEntity<?>> responseEntity = Mono
.from(body(wrapper.handler(), exchange))
.doOnError(e -> logger.error("Failed to generate POST input for function: " + wrapper.function, e))
.flatMap(body -> response(wrapper, body, false));
return responseEntity;
}
public Mono<ResponseEntity<?>> post(FunctionWrapper wrapper, String body,
boolean stream) {
Object function = wrapper.handler();
Class<?> inputType = function == null
? Object.class
: FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) function).getInputType()));
FunctionInvocationWrapper function = (FunctionInvocationWrapper) wrapper.handler();
Type itemType = getItemType(function);
Object input = body == null && inputType.isAssignableFrom(String.class) ? "" : body;
Object input = body == null ? "" : body;
if ((isInputMultiple(this.getTargetIfRouting(wrapper, function)) || !(function instanceof RoutingFunction))
&& input != null) { // TODO rework. . . pretty ugly
if (this.shouldUseJsonConversion((String) input, wrapper.headers.getContentType())) {
Type jsonType = body.startsWith("[")
&& Collection.class.isAssignableFrom(inputType)
|| body.startsWith("{") ? inputType : Collection.class;
if (body.startsWith("[") && itemType instanceof Class) {
jsonType = ResolvableType.forClassWithGenerics((Class<?>) jsonType,
(Class<?>) itemType).getType();
}
input = this.mapper.fromJson((String) input, jsonType);
}
else {
input = this.converter.convert(function, (String) input);
}
/*
* We need this to ensure that imperative function which are sent array-like input
* can be invoked with each item and then aggregated
*/
if (input != null && JsonMapper.isJsonStringRepresentsCollection(input)) {
Type type = FunctionTypeUtils.isTypeCollection(itemType)
? ResolvableType.forType(itemType).getType()
: ResolvableType.forClassWithGenerics(Collection.class, ResolvableType.forType(itemType)).asCollection().getType();
input = this.mapper.fromJson((String) input, type);
}
return response(wrapper, input, stream);
}
public Mono<ResponseEntity<?>> stream(FunctionWrapper request) {
Publisher<?> result = request.function() != null
? value(request)
: request.supplier().get();
return stream(request, result);
public Mono<ResponseEntity<?>> stream(FunctionWrapper functionWrapper) {
Publisher<?> result = functionWrapper.function.isFunction()
? value(functionWrapper)
: (Publisher<?>) functionWrapper.function.get();
return stream(functionWrapper, result);
}
private boolean shouldUseJsonConversion(String body, MediaType contentType) {
return (body.startsWith("[") || body.startsWith("{"))
&& (contentType == null || (contentType != null
&& !"text".equalsIgnoreCase(contentType.getType())));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Mono<ResponseEntity<?>> response(FunctionWrapper wrapper, Object body, boolean stream) {
private List<HttpMessageReader<?>> getMessageReaders() {
return this.messageReaders;
FunctionInvocationWrapper function = (wrapper.function());
Flux<?> flux;
Class<?> inputType = function == null
? Object.class
: FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(function.getInputType()));
if (MultiValueMap.class.isAssignableFrom(inputType)) {
body = null;
flux = Flux.just(wrapper.params());
}
else if (body != null) {
if (Collection.class.isAssignableFrom(inputType)) {
flux = Flux.just(body);
}
else if (body instanceof Flux) {
flux = Flux.from((Flux) body);
}
else {
Iterable<?> iterable = body instanceof Collection
? (Collection<?>) body
: Collections.singletonList(body);
flux = Flux.fromIterable(iterable);
}
}
else {
throw new IllegalStateException(
"Failed to determine input for function call with parameters: '"
+ wrapper.params + "' and headers: `" + wrapper.headers
+ "`");
}
if (function != null) {
flux = messages(wrapper, function, flux);
}
Mono<ResponseEntity<?>> responseEntityMono = null;
if (function == null) {
responseEntityMono = Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND)
.body("Function for provided path can not be found"));
}
else {
Publisher<?> result = (Publisher<?>) FunctionWebUtils.invokeFunction(function, flux, function.isInputTypeMessage());
if (function.isConsumer()) {
if (result != null) {
((Mono) result).subscribe();
}
logger.debug("Handled POST with consumer");
responseEntityMono = Mono.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
else {
result = Flux.from((Publisher) result);
logger.debug("Handled POST with function: " + function);
if (stream) {
responseEntityMono = stream(wrapper, result);
}
else {
responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
body == null ? null : !(body instanceof Collection), false);
}
}
}
return responseEntityMono;
}
private Mono<ResponseEntity<?>> response(FunctionWrapper request, Object handler,
@@ -211,91 +213,6 @@ public class RequestProcessor {
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
public Mono<ResponseEntity<?>> response(FunctionWrapper wrapper, Object body,
boolean stream) {
Function function = wrapper.function();
Flux<?> flux;
Class<?> inputType = function == null ? Object.class : FunctionTypeUtils
.getRawType(FunctionTypeUtils.getGenericType(((FunctionInvocationWrapper) wrapper.handler()).getInputType()));
if (body != null) {
if (Collection.class.isAssignableFrom(inputType)) {
flux = Flux.just(body);
}
else if (body instanceof Flux) {
flux = Flux.from((Flux) body);
}
else {
Iterable<?> iterable = body instanceof Collection ? (Collection<?>) body
: (body instanceof Set ? Collections.singleton(body)
: Collections.singletonList(body));
flux = Flux.fromIterable(iterable);
}
}
else if (MultiValueMap.class.isAssignableFrom(inputType)) {
flux = Flux.just(wrapper.params());
}
else {
throw new IllegalStateException(
"Failed to determine input for function call with parameters: '"
+ wrapper.params + "' and headers: `" + wrapper.headers
+ "`");
}
if (function != null && ((FunctionInvocationWrapper) function).isInputTypeMessage()) {
flux = messages(wrapper, function, flux);
}
Mono<ResponseEntity<?>> responseEntityMono = null;
if (function == null) {
responseEntityMono = Mono.just(ResponseEntity.status(HttpStatus.NOT_FOUND)
.body("Function for provided path can not be found"));
}
else if (function instanceof FluxedConsumer || function instanceof FluxConsumer) {
((Mono<?>) function.apply(flux)).subscribe();
logger.debug("Handled POST with consumer");
responseEntityMono = Mono.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
else if (function instanceof FunctionInvocationWrapper) {
Publisher<?> result = (Publisher<?>) FunctionWebUtils.invokeFunction((FunctionInvocationWrapper) function, flux,
((FunctionInvocationWrapper) function).isInputTypeMessage());
if (((FunctionInvocationWrapper) function).isConsumer()) {
if (result != null) {
((Mono) result).subscribe();
}
logger.debug("Handled POST with consumer");
responseEntityMono = Mono
.just(ResponseEntity.status(HttpStatus.ACCEPTED).build());
}
else {
result = Flux.from((Publisher) result);
logger.debug("Handled POST with function: " + function);
if (stream) {
responseEntityMono = stream(wrapper, result);
}
else {
responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
body == null ? null : !(body instanceof Collection), false);
}
}
}
else {
Flux<?> result = Flux.from((Publisher) function.apply(flux));
logger.debug("Handled POST with function");
if (stream) {
responseEntityMono = stream(wrapper, result);
}
else {
responseEntityMono = response(wrapper, getTargetIfRouting(wrapper, function), result,
body == null ? null : !(body instanceof Collection), false);
}
}
return responseEntityMono;
}
/*
* Called when building response and returns the actual
* target function in case the current function is RoutingFunction.
@@ -310,9 +227,9 @@ public class RequestProcessor {
return function;
}
// this seem to be very relevant to AWS container tests
private Flux<?> messages(FunctionWrapper request, Object function, Flux<?> flux) {
Map<String, Object> headers = new HashMap<>(HeaderUtils.fromHttp(request.headers()));
if (function instanceof FunctionInvocationWrapper) {
headers.put("scf-func-name", ((FunctionInvocationWrapper) function).getFunctionDefinition());
}
@@ -352,131 +269,25 @@ public class RequestProcessor {
private boolean isOutputSingle(Object handler) {
FunctionInvocationWrapper function = (FunctionInvocationWrapper) handler;
Type outputType = function.getOutputType();
// if (function.isOutputTypePublisher()) {
// outputType = FunctionTypeUtils.getGenericType(outputType);
// }
// if (function.isOutputTypeMessage()) {
// outputType = FunctionTypeUtils.getGenericType(outputType);
// }
Class<?> type = FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(outputType));
// Class<?> type1 = this.inspector.getOutputType(handler);
// Class<?> wrapper1 = this.inspector.getOutputWrapper(handler);
Class<?> wrapper = function.isOutputTypePublisher() ? FunctionTypeUtils.getRawType(outputType) : type;
if (Stream.class.isAssignableFrom(type)) {
return false;
}
else {
return wrapper == type || Mono.class.equals(wrapper)
|| Optional.class.equals(wrapper);
}
}
private Publisher<?> body(Object handler, ServerWebExchange exchange) {
FunctionInvocationWrapper function = (FunctionInvocationWrapper) handler;
Class<?> inputType = FunctionTypeUtils
.getRawType(FunctionTypeUtils.getGenericType(function.getInputType()));
ResolvableType elementType = ResolvableType.forClass(inputType);
// we effectively delegate type conversion to FunctionCatalog
elementType = ResolvableType.forClass(String.class);
ResolvableType actualType = elementType;
Class<?> resolvedType = elementType.resolve();
ReactiveAdapter adapter = (resolvedType != null
? getAdapterRegistry().getAdapter(resolvedType) : null);
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
MediaType contentType = request.getHeaders().getContentType();
MediaType mediaType = (contentType != null ? contentType
: MediaType.APPLICATION_OCTET_STREAM);
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + (contentType != null
? "Content-Type:" + contentType
: "No Content-Type, using " + MediaType.APPLICATION_OCTET_STREAM));
}
boolean isBodyRequired = (adapter != null && !adapter.supportsEmpty());
MethodParameter bodyParam = new MethodParameter(handlerMethod(handler), 0);
for (HttpMessageReader<?> reader : getMessageReaders()) {
if (reader.canRead(elementType, mediaType)) {
Map<String, Object> readHints = Hints.from(Hints.LOG_PREFIX_HINT,
exchange.getLogPrefix());
if (adapter != null && adapter.isMultiValue()) {
if (logger.isDebugEnabled()) {
logger.debug(
exchange.getLogPrefix() + "0..N [" + elementType + "]");
}
Flux<?> flux = reader.read(actualType, elementType, request, response,
readHints);
flux = flux.onErrorResume(
ex -> Flux.error(handleReadError(bodyParam, ex)));
if (isBodyRequired) {
flux = flux.switchIfEmpty(
Flux.error(() -> handleMissingBody(bodyParam)));
}
return Mono.just(adapter.fromPublisher(flux));
}
else {
// Single-value (with or without reactive type wrapper)
if (logger.isDebugEnabled()) {
logger.debug(exchange.getLogPrefix() + "0..1 [" + elementType + "]");
}
Mono<?> mono = reader.readMono(actualType, elementType, request,
response, readHints).doOnNext(v -> {
if (logger.isDebugEnabled()) {
logger.debug("received: " + v);
}
});
mono = mono.onErrorResume(
ex -> Mono.error(handleReadError(bodyParam, ex)));
if (isBodyRequired) {
mono = mono.switchIfEmpty(
Mono.error(() -> handleMissingBody(bodyParam)));
}
return (adapter != null ? Mono.just(adapter.fromPublisher(mono))
: Mono.from(mono));
}
}
}
return Mono.error(new UnsupportedMediaTypeStatusException(mediaType,
Arrays.asList(MediaType.APPLICATION_JSON), elementType));
}
private Method handlerMethod(Object handler) {
return ReflectionUtils.findMethod(handler.getClass(), "apply", (Class<?>[]) null);
}
private Throwable handleReadError(MethodParameter parameter, Throwable ex) {
return (ex instanceof DecodingException ? new ServerWebInputException(
"Failed to read HTTP message", parameter, ex) : ex);
}
private ServerWebInputException handleMissingBody(MethodParameter param) {
return new ServerWebInputException(
"Request body is missing: " + param.getExecutable().toGenericString());
}
private ReactiveAdapterRegistry getAdapterRegistry() {
return ReactiveAdapterRegistry.getSharedInstance();
}
private Publisher<?> value(FunctionWrapper wrapper) {
Flux<?> input = Flux.from(wrapper.argument)
.map(body -> this.converter.convert(wrapper.function, body));
if (((FunctionInvocationWrapper) (Object) wrapper.function).isInputTypeMessage()) {
input = messages(wrapper, wrapper.function, input);
}
return Mono.from(wrapper.function.apply(input));
Flux<?> input = Flux.from(wrapper.argument);
FunctionInvocationWrapper function = (wrapper.function);
Object result = FunctionWebUtils.invokeFunction(function, input, function.isInputTypeMessage());
return Mono.from((Publisher<?>) result);
}
private Type getItemType(Object function) {
if (function == null || ((FunctionInvocationWrapper) function).getInputType() == Object.class) {
return Object.class;
}
@@ -501,7 +312,6 @@ public class RequestProcessor {
return inputType;
}
// Type type = this.inspector.getRegistration(function).getType().getType();
Type type = ((FunctionInvocationWrapper) function).getInputType();
if (type instanceof ParameterizedType) {
type = ((ParameterizedType) type).getActualTypeArguments()[0];
@@ -528,9 +338,7 @@ public class RequestProcessor {
*/
public static class FunctionWrapper {
private final Function<Publisher<?>, Publisher<?>> function;
private final Supplier<Publisher<?>> supplier;
private final FunctionInvocationWrapper function;
private final MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
@@ -538,26 +346,21 @@ public class RequestProcessor {
private Publisher<String> argument;
@SuppressWarnings("unchecked")
public FunctionWrapper(
Function<? extends Publisher<?>, ? extends Publisher<?>> function,
Supplier<? extends Publisher<?>> supplier) {
this.function = (Function<Publisher<?>, Publisher<?>>) function;
this.supplier = (Supplier<Publisher<?>>) supplier;
public FunctionWrapper(FunctionInvocationWrapper function) {
this.function = function;
}
public Object handler() {
return this.function != null
? this.function
: this.supplier;
}
public Function<Publisher<?>, Publisher<?>> function() {
return this.function;
}
public Supplier<Publisher<?>> supplier() {
return this.supplier;
public FunctionInvocationWrapper function() {
return this.function;
}
@Deprecated
public Supplier<?> supplier() {
return this.function;
}
public MultiValueMap<String, String> params() {
@@ -591,7 +394,5 @@ public class RequestProcessor {
public Publisher<String> argument() {
return this.argument;
}
}
}

View File

@@ -16,14 +16,10 @@
package org.springframework.cloud.function.web.flux;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
@@ -83,13 +79,6 @@ public class FunctionController {
return map;
}
@PostMapping(path = "/**", consumes = MediaType.APPLICATION_OCTET_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<?>> post(ServerWebExchange request) {
FunctionWrapper wrapper = wrapper(request);
return this.processor.post(wrapper, request);
}
@PostMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> post(ServerWebExchange request,
@@ -120,16 +109,9 @@ public class FunctionController {
}
private FunctionWrapper wrapper(ServerWebExchange request) {
@SuppressWarnings("unchecked")
Function<Publisher<?>, Publisher<?>> function = (Function<Publisher<?>, Publisher<?>>) request
.getAttribute(WebRequestConstants.FUNCTION);
@SuppressWarnings("unchecked")
Consumer<Publisher<?>> consumer = (Consumer<Publisher<?>>) request
.getAttribute(WebRequestConstants.CONSUMER);
@SuppressWarnings("unchecked")
Supplier<Publisher<?>> supplier = (Supplier<Publisher<?>>) request
.getAttribute(WebRequestConstants.SUPPLIER);
FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier);
FunctionInvocationWrapper function = (FunctionInvocationWrapper) request
.getAttribute(WebRequestConstants.HANDLER);
FunctionWrapper wrapper = RequestProcessor.wrapper(function);
wrapper.headers(request.getRequest().getHeaders());
wrapper.params(request.getRequest().getQueryParams());
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT);

View File

@@ -20,7 +20,6 @@ import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -42,10 +41,8 @@ import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.cloud.function.web.BasicStringConverter;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.StringConverter;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.FunctionWebUtils;
import org.springframework.context.ApplicationContext;
@@ -103,12 +100,10 @@ class FunctionEndpointInitializer implements ApplicationContextInitializer<Gener
}
private void registerEndpoint(GenericApplicationContext context) {
context.registerBean(StringConverter.class,
() -> new BasicStringConverter(context.getBeanFactory()));
context.registerBean(RequestProcessor.class,
() -> new RequestProcessor(
context.getBean(FunctionCatalog.class),
context.getBeanProvider(JsonMapper.class), context.getBean(StringConverter.class),
context.getBeanProvider(JsonMapper.class),
context.getBeanProvider(ServerCodecConfigurer.class)));
context.registerBean(FunctionEndpointFactory.class,
() -> new FunctionEndpointFactory(context.getBean(FunctionCatalog.class), context.getBean(RequestProcessor.class),
@@ -198,8 +193,6 @@ class FunctionEndpointFactory {
private final String handler;
// private final FunctionInspector inspector;
private final RequestProcessor processor;
FunctionEndpointFactory(FunctionCatalog functionCatalog, RequestProcessor processor,
@@ -209,13 +202,12 @@ class FunctionEndpointFactory {
handler = null;
}
this.processor = processor;
// this.inspector = inspector;
this.functionCatalog = functionCatalog;
this.handler = handler;
}
private Object extract(ServerRequest request) {
Object function;
private FunctionInvocationWrapper extract(ServerRequest request) {
FunctionInvocationWrapper function;
if (handler != null) {
logger.info("Configured function: " + handler);
Set<String> names = this.functionCatalog.getNames(Function.class);
@@ -233,12 +225,11 @@ class FunctionEndpointFactory {
@SuppressWarnings({ "unchecked" })
public <T> RouterFunction<?> functionEndpoints() {
return route(POST("/**"), request -> {
Object function = extract(request);
FunctionInvocationWrapper funcWrapper = (FunctionInvocationWrapper) function;
FunctionInvocationWrapper funcWrapper = extract(request);
Class<?> outputType = funcWrapper == null
? Object.class
: FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(funcWrapper.getOutputType()));
FunctionWrapper wrapper = RequestProcessor.wrapper((Function<Flux<?>, Flux<?>>) function, null, null);
FunctionWrapper wrapper = RequestProcessor.wrapper(funcWrapper);
Mono<ResponseEntity<?>> stream = request.bodyToMono(String.class)
.flatMap(content -> this.processor.post(wrapper, content, false));
return stream.flatMap(entity -> {
@@ -247,28 +238,21 @@ class FunctionEndpointFactory {
});
})
.andRoute(GET("/**"), request -> {
Object functionComponent = extract(request);
FunctionInvocationWrapper funcWrapper = (FunctionInvocationWrapper) functionComponent;
FunctionInvocationWrapper funcWrapper = extract(request);
Class<?> outputType = FunctionTypeUtils.getRawType(FunctionTypeUtils.getGenericType(funcWrapper.getOutputType()));
if (((FunctionInvocationWrapper) functionComponent).isSupplier()) {
Supplier<? extends Flux<?>> supplier = (Supplier<Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(null, null, supplier);
//Object result = wrapper.supplier().get();
Object func = wrapper.supplier();
Object result = FunctionWebUtils.invokeFunction((FunctionInvocationWrapper) func, null, ((FunctionInvocationWrapper) func).isInputTypeMessage());
if (funcWrapper.isSupplier()) {
Object result = FunctionWebUtils.invokeFunction(funcWrapper, null, funcWrapper.isInputTypeMessage());
if (!(result instanceof Publisher)) {
result = Mono.just(result);
}
return ServerResponse.ok().body(result, outputType);
}
else {
Function<Flux<?>, Flux<?>> function = (Function<Flux<?>, Flux<?>>) functionComponent;
FunctionWrapper wrapper = RequestProcessor.wrapper(function, null, null);
FunctionWrapper wrapper = RequestProcessor.wrapper(funcWrapper);
wrapper.headers(request.headers().asHttpHeaders());
String argument = (String) request.attribute(WebRequestConstants.ARGUMENT).get();
wrapper.argument(Flux.just(argument));
Object func = wrapper.function();
Object result = FunctionWebUtils.invokeFunction((FunctionInvocationWrapper) func, wrapper.argument(), ((FunctionInvocationWrapper) func).isInputTypeMessage());
Object result = FunctionWebUtils.invokeFunction(funcWrapper, wrapper.argument(), funcWrapper.isInputTypeMessage());
return ServerResponse.ok().body(result, outputType);
}
});

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2012-2019 the original author or authors.
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,13 +18,11 @@ package org.springframework.cloud.function.web.mvc;
import java.util.Arrays;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
@@ -93,16 +91,9 @@ public class FunctionController {
}
private FunctionWrapper wrapper(WebRequest request) {
@SuppressWarnings("unchecked")
Function<Publisher<?>, Publisher<?>> function = (Function<Publisher<?>, Publisher<?>>) request
.getAttribute(WebRequestConstants.FUNCTION, WebRequest.SCOPE_REQUEST);
@SuppressWarnings("unchecked")
Consumer<Publisher<?>> consumer = (Consumer<Publisher<?>>) request
.getAttribute(WebRequestConstants.CONSUMER, WebRequest.SCOPE_REQUEST);
@SuppressWarnings("unchecked")
Supplier<Publisher<?>> supplier = (Supplier<Publisher<?>>) request
.getAttribute(WebRequestConstants.SUPPLIER, WebRequest.SCOPE_REQUEST);
FunctionWrapper wrapper = RequestProcessor.wrapper(function, consumer, supplier);
FunctionInvocationWrapper function = (FunctionInvocationWrapper) request
.getAttribute(WebRequestConstants.HANDLER, WebRequest.SCOPE_REQUEST);
FunctionWrapper wrapper = RequestProcessor.wrapper(function);
for (String key : request.getParameterMap().keySet()) {
wrapper.params().addAll(key, Arrays.asList(request.getParameterValues(key)));
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,10 +18,10 @@ package org.springframework.cloud.function.web.util;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
@@ -33,17 +33,17 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Oleg Zhurakousky
*
*/
public final class FunctionWebUtils {
private FunctionWebUtils() {
}
public static Object findFunction(HttpMethod method, FunctionCatalog functionCatalog,
public static FunctionInvocationWrapper findFunction(HttpMethod method, FunctionCatalog functionCatalog,
Map<String, Object> attributes, String path, String[] acceptContentTypes) {
if (method.equals(HttpMethod.GET) || method.equals(HttpMethod.POST)) {
return doFindFunction(method, functionCatalog, attributes, path, acceptContentTypes);
@@ -63,15 +63,14 @@ public final class FunctionWebUtils {
}
acceptContentTypes = new String[] {StringUtils.arrayToCommaDelimitedString(acceptContentTypes)};
// return acceptContentTypes;
return new String[] {};
}
private static Object doFindFunction(HttpMethod method, FunctionCatalog functionCatalog,
private static FunctionInvocationWrapper doFindFunction(HttpMethod method, FunctionCatalog functionCatalog,
Map<String, Object> attributes, String path, String[] acceptContentTypes) {
path = path.startsWith("/") ? path.substring(1) : path;
if (method.equals(HttpMethod.GET)) {
Supplier<Publisher<?>> supplier = functionCatalog.lookup(path, acceptContentTypes);
FunctionInvocationWrapper supplier = functionCatalog.lookup(path, acceptContentTypes);
if (supplier != null) {
attributes.put(WebRequestConstants.SUPPLIER, supplier);
return supplier;
@@ -89,7 +88,7 @@ public final class FunctionWebUtils {
name = builder.toString();
value = path.length() > name.length() ? path.substring(name.length() + 1)
: null;
Function<Object, Object> function = functionCatalog.lookup(name, acceptContentTypes);
FunctionInvocationWrapper function = functionCatalog.lookup(name, acceptContentTypes);
if (function != null) {
attributes.put(WebRequestConstants.FUNCTION, function);
if (value != null) {
@@ -106,6 +105,7 @@ public final class FunctionWebUtils {
return postProcessResult(result, isMessage);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static Object postProcessResult(Object result, boolean isMessage) {
if (result instanceof Flux) {
result = ((Flux) result).map(v -> postProcessResult(v, isMessage));

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.test;
import java.time.Duration;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -44,10 +45,12 @@ public class FunctionalWithInputSetTests {
@Test
public void words() throws Exception {
String reply = this.client.post().uri("/")
.body(Mono.just("[{\"value\":\"foo\"}, {\"value\":\"bar\"}]"),
String.class)
.exchange().expectStatus().isOk().expectBody(String.class).returnResult()
this.client = this.client.mutate().responseTimeout(Duration.ofSeconds(300)).build();
String reply = this.client
.post().uri("/")
.body(Mono.just("[{\"value\":\"foo\"}, {\"value\":\"bar\"}]"), String.class)
.exchange()
.expectStatus().isOk().expectBody(String.class).returnResult()
.getResponseBody();
assertThat(reply.contains("FOO")).isTrue();
assertThat(reply.contains("BAR")).isTrue();

View File

@@ -46,6 +46,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
@@ -57,6 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=reactive")
@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class })
@DirtiesContext
public class HttpGetIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;

View File

@@ -236,6 +236,7 @@ public class HttpPostIntegrationTests {
@Test
@DirtiesContext
@Disabled // not sure if this test is correct. Why does ? has to be assumed as String?
public void typelessFunctionPassingArray() throws Exception {
ResponseEntity<String> result = this.rest.exchange(
RequestEntity.post(new URI("/typelessFunctionExpectingText"))

View File

@@ -47,6 +47,7 @@ import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
@@ -58,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=servlet")
@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class })
@DirtiesContext
public class HttpGetIntegrationTests {
private static final MediaType EVENT_STREAM = MediaType.TEXT_EVENT_STREAM;
@@ -193,6 +195,7 @@ public class HttpGetIntegrationTests {
@Test
public void sentencesAcceptSse() throws Exception {
Thread.sleep(1000);
ResponseEntity<String> result = this.rest.exchange(
RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(),
String.class);

View File

@@ -101,6 +101,7 @@ public class RoutingFunctionTests {
@Test
@DirtiesContext
@Disabled // not sure if this test is correct. Why does ? has to be assumed as String?
public void testFluxFunctionPrimitive() throws Exception {
this.functionProperties.setDefinition("fluxuppercase");
ResponseEntity<String> postForEntity = this.rest