GH-708 Consolidate web request processing for Flux and MVC endpoints"
This commit is contained in:
@@ -37,7 +37,7 @@ import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.message.MessageUtils;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebUtils;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.cloud.function.web.util.HeaderUtils;
|
||||
import org.springframework.core.ResolvableType;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
@@ -75,7 +75,7 @@ public class RequestProcessor {
|
||||
}
|
||||
else {
|
||||
FunctionInvocationWrapper function = (wrapper.function);
|
||||
Object result = FunctionWebUtils.invokeFunction(function, null, false);
|
||||
Object result = FunctionWebRequestProcessingHelper.invokeFunction(function, null, false);
|
||||
return response(wrapper, wrapper.function(), result instanceof Publisher ? (Publisher) result : Flux.just(result), null,
|
||||
true);
|
||||
}
|
||||
@@ -153,7 +153,7 @@ public class RequestProcessor {
|
||||
.body("Function for provided path can not be found"));
|
||||
}
|
||||
else {
|
||||
Publisher<?> result = (Publisher<?>) FunctionWebUtils.invokeFunction(function, flux, function.isInputTypeMessage());
|
||||
Publisher<?> result = (Publisher<?>) FunctionWebRequestProcessingHelper.invokeFunction(function, flux, function.isInputTypeMessage());
|
||||
if (function.isConsumer()) {
|
||||
if (result != null) {
|
||||
((Mono) result).subscribe();
|
||||
@@ -279,7 +279,7 @@ public class RequestProcessor {
|
||||
private Publisher<?> invokeFunction(FunctionWrapper wrapper) {
|
||||
if (wrapper.argument != null) {
|
||||
Flux<?> input = Flux.from(wrapper.argument);
|
||||
Object result = FunctionWebUtils.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage());
|
||||
Object result = FunctionWebRequestProcessingHelper.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage());
|
||||
return Mono.from((Publisher<?>) result);
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -16,28 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.function.web.flux;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.cloud.function.web.util.FunctionWrapper;
|
||||
import org.springframework.cloud.function.web.util.HeaderUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.ResponseEntity.BodyBuilder;
|
||||
import org.springframework.http.codec.multipart.FormFieldPart;
|
||||
import org.springframework.http.codec.multipart.Part;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.LinkedMultiValueMap;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
@@ -50,19 +39,19 @@ import org.springframework.web.server.ServerWebExchange;
|
||||
/**
|
||||
* @author Dave Syer
|
||||
* @author Mark Fisher
|
||||
* @author Oleg Zhurakousky
|
||||
*/
|
||||
@Component
|
||||
public class FunctionController {
|
||||
|
||||
private static Log logger = LogFactory.getLog(FunctionController.class);
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE)
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<?>> form(ServerWebExchange request) {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
return request.getFormData().doOnSuccess(params -> wrapper.getParams().addAll(params))
|
||||
.then(Mono.defer(() -> (Mono<ResponseEntity<?>>) this.doProcess(request, wrapper.getParams(), false)));
|
||||
.then(Mono.defer(() -> (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper
|
||||
.processRequest(wrapper, wrapper.getParams(), false)));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -72,20 +61,8 @@ public class FunctionController {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
return request.getMultipartData()
|
||||
.doOnSuccess(params -> wrapper.getParams().addAll(multi(params)))
|
||||
.then(Mono.defer(() -> (Mono<ResponseEntity<?>>) this.doProcess(request, wrapper.getParams(), false)));
|
||||
}
|
||||
|
||||
private MultiValueMap<String, String> multi(MultiValueMap<String, Part> body) {
|
||||
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
|
||||
for (String key : body.keySet()) {
|
||||
for (Part part : body.get(key)) {
|
||||
if (part instanceof FormFieldPart) {
|
||||
FormFieldPart form = (FormFieldPart) part;
|
||||
map.add(key, form.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
return map;
|
||||
.then(Mono.defer(() -> (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper
|
||||
.processRequest(wrapper, wrapper.getParams(), false)));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -93,15 +70,14 @@ public class FunctionController {
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<?>> post(ServerWebExchange request,
|
||||
@RequestBody(required = false) String body) {
|
||||
Mono<ResponseEntity<?>> m = (Mono<ResponseEntity<?>>) this.doProcess(request, body, false);
|
||||
return m;
|
||||
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<?>> postStream(ServerWebExchange request, @RequestBody(required = false) Flux<String> body) {
|
||||
return (Mono<ResponseEntity<?>>) this.doProcess(request, body, false);
|
||||
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -109,7 +85,7 @@ public class FunctionController {
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<?>> get(ServerWebExchange request) {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
return (Mono<ResponseEntity<?>>) this.doProcess(request, wrapper.getArgument(), false);
|
||||
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -117,7 +93,7 @@ public class FunctionController {
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<?>> getStream(ServerWebExchange request) {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
return (Mono<ResponseEntity<?>>) this.doProcess(request, wrapper.getArgument(), true);
|
||||
return (Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true);
|
||||
}
|
||||
|
||||
private FunctionWrapper wrapper(ServerWebExchange request) {
|
||||
@@ -133,66 +109,16 @@ public class FunctionController {
|
||||
return wrapper;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private Object doProcess(ServerWebExchange request, Object argument, boolean eventStream) {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
|
||||
FunctionInvocationWrapper function = wrapper.getFunction();
|
||||
|
||||
HttpHeaders headers = wrapper.getHeaders();
|
||||
|
||||
Message<?> inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build();
|
||||
|
||||
if (function.isRoutingFunction()) {
|
||||
function.setSkipOutputConversion(true);
|
||||
private MultiValueMap<String, String> multi(MultiValueMap<String, Part> body) {
|
||||
MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
|
||||
for (String key : body.keySet()) {
|
||||
for (Part part : body.get(key)) {
|
||||
if (part instanceof FormFieldPart) {
|
||||
FormFieldPart form = (FormFieldPart) part;
|
||||
map.add(key, form.value());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage);
|
||||
|
||||
Object result = function.apply(input);
|
||||
if (function.isConsumer()) {
|
||||
((Mono) result).subscribe();
|
||||
return Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build());
|
||||
}
|
||||
|
||||
BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers));
|
||||
|
||||
Publisher pResult;
|
||||
if (result instanceof Publisher) {
|
||||
pResult = (Publisher) result;
|
||||
if (eventStream) {
|
||||
return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result)));
|
||||
}
|
||||
|
||||
if (pResult instanceof Flux) {
|
||||
pResult = ((Flux) pResult).onErrorContinue((e, v) -> {
|
||||
logger.error("Failed to process value: " + v, (Throwable) e);
|
||||
}).collectList();
|
||||
}
|
||||
pResult = Mono.from(pResult);
|
||||
}
|
||||
else {
|
||||
pResult = Mono.just(result);
|
||||
}
|
||||
|
||||
return Mono.from(pResult).map(v -> {
|
||||
if (v instanceof Iterable) {
|
||||
List aggregatedResult = (List) ((Collection) v).stream().map(m -> {
|
||||
return m instanceof Message ? this.doProcessMessage(responseOkBuilder, (Message<?>) m) : m;
|
||||
}).collect(Collectors.toList());
|
||||
return responseOkBuilder.header("content-type", "application/json").body(aggregatedResult);
|
||||
}
|
||||
else if (v instanceof Message) {
|
||||
return responseOkBuilder.body(this.doProcessMessage(responseOkBuilder, (Message<?>) v));
|
||||
}
|
||||
else {
|
||||
return responseOkBuilder.body(v);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private Object doProcessMessage(BodyBuilder responseOkBuilder, Message<?> message) {
|
||||
responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders()));
|
||||
return message.getPayload();
|
||||
return map;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebUtils;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.method.HandlerMethod;
|
||||
@@ -79,7 +79,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
|
||||
if (path.startsWith(this.prefix)) {
|
||||
path = path.substring(this.prefix.length());
|
||||
}
|
||||
Object function = FunctionWebUtils
|
||||
Object function = FunctionWebRequestProcessingHelper
|
||||
.findFunction(request.getRequest().getMethod(), this.functions, request.getAttributes(), path, new String[] {});
|
||||
|
||||
if (function != null) {
|
||||
|
||||
@@ -44,7 +44,7 @@ import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.cloud.function.web.RequestProcessor;
|
||||
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebUtils;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextInitializer;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
@@ -222,8 +222,8 @@ class FunctionEndpointFactory {
|
||||
function = this.functionCatalog.lookup(Function.class, handler);
|
||||
}
|
||||
else {
|
||||
String[] accept = FunctionWebUtils.acceptContentTypes(request.headers().accept());
|
||||
function = FunctionWebUtils.findFunction(request.method(), functionCatalog, request.attributes(),
|
||||
String[] accept = FunctionWebRequestProcessingHelper.acceptContentTypes(request.headers().accept());
|
||||
function = FunctionWebRequestProcessingHelper.findFunction(request.method(), functionCatalog, request.attributes(),
|
||||
request.path(), accept);
|
||||
}
|
||||
return function;
|
||||
@@ -247,7 +247,7 @@ class FunctionEndpointFactory {
|
||||
Class<?> outputType = FunctionTypeUtils
|
||||
.getRawType(FunctionTypeUtils.getGenericType(funcWrapper.getOutputType()));
|
||||
if (funcWrapper.isSupplier()) {
|
||||
Object result = FunctionWebUtils.invokeFunction(funcWrapper, null, funcWrapper.isInputTypeMessage());
|
||||
Object result = FunctionWebRequestProcessingHelper.invokeFunction(funcWrapper, null, funcWrapper.isInputTypeMessage());
|
||||
if (!(result instanceof Publisher)) {
|
||||
result = Mono.just(result);
|
||||
}
|
||||
@@ -258,7 +258,7 @@ class FunctionEndpointFactory {
|
||||
wrapper.headers(request.headers().asHttpHeaders());
|
||||
String argument = (String) request.attribute(WebRequestConstants.ARGUMENT).get();
|
||||
wrapper.argument(Flux.just(argument));
|
||||
Object result = FunctionWebUtils.invokeFunction(funcWrapper, wrapper.argument(),
|
||||
Object result = FunctionWebRequestProcessingHelper.invokeFunction(funcWrapper, wrapper.argument(),
|
||||
funcWrapper.isInputTypeMessage());
|
||||
return ServerResponse.ok().body(result, outputType);
|
||||
}
|
||||
|
||||
@@ -17,7 +17,6 @@
|
||||
package org.springframework.cloud.function.web.mvc;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -28,9 +27,8 @@ import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.cloud.function.web.util.FunctionWrapper;
|
||||
import org.springframework.cloud.function.web.util.HeaderUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.ResponseEntity.BodyBuilder;
|
||||
@@ -82,7 +80,7 @@ public class FunctionController {
|
||||
return Mono.from(result).flatMap(body -> Mono.just(builder.body(body)));
|
||||
}
|
||||
}
|
||||
return this.doProcess(request, wrapper.getParams(), false);
|
||||
return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getParams(), false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -91,7 +89,7 @@ public class FunctionController {
|
||||
public Mono<ResponseEntity<Publisher<?>>> postStream(WebRequest request,
|
||||
@RequestBody(required = false) String body) {
|
||||
String argument = StringUtils.hasText(body) ? body : "";
|
||||
return ((Mono<ResponseEntity<?>>) this.doProcess(request, argument, true)).map(response -> ResponseEntity.ok()
|
||||
return ((Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, true)).map(response -> ResponseEntity.ok()
|
||||
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
|
||||
}
|
||||
|
||||
@@ -99,8 +97,9 @@ public class FunctionController {
|
||||
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
|
||||
@ResponseBody
|
||||
public Mono<ResponseEntity<Publisher<?>>> getStream(WebRequest request) {
|
||||
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST);
|
||||
return ((Mono<ResponseEntity<?>>) this.doProcess(request, argument, true)).map(response -> ResponseEntity.ok()
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
return ((Mono<ResponseEntity<?>>) FunctionWebRequestProcessingHelper
|
||||
.processRequest(wrapper, wrapper.getArgument(), true)).map(response -> ResponseEntity.ok()
|
||||
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
|
||||
}
|
||||
|
||||
@@ -108,79 +107,14 @@ public class FunctionController {
|
||||
@ResponseBody
|
||||
public Object post(WebRequest request, @RequestBody(required = false) String body) {
|
||||
String argument = StringUtils.hasText(body) ? body : "";
|
||||
return this.doProcess(request, argument, false);
|
||||
return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, false);
|
||||
}
|
||||
|
||||
@GetMapping(path = "/**")
|
||||
@ResponseBody
|
||||
public Object get(WebRequest request) {
|
||||
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST);
|
||||
return this.doProcess(request, argument, false);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private Object doProcess(WebRequest request, Object argument, boolean eventStream) {
|
||||
FunctionWrapper wrapper = wrapper(request);
|
||||
|
||||
FunctionInvocationWrapper function = wrapper.getFunction();
|
||||
|
||||
HttpHeaders headers = wrapper.getHeaders();
|
||||
|
||||
Message<?> inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build();
|
||||
|
||||
if (function.isRoutingFunction()) {
|
||||
function.setSkipOutputConversion(true);
|
||||
}
|
||||
|
||||
Object result = function.apply(inputMessage);
|
||||
|
||||
BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers));
|
||||
|
||||
if (result instanceof Publisher) {
|
||||
Publisher p = (Publisher) result;
|
||||
if (eventStream) {
|
||||
return Flux.from(p).then(Mono.fromSupplier(() -> responseOkBuilder.body(p)));
|
||||
}
|
||||
|
||||
if (result instanceof Flux) {
|
||||
result = ((Flux) result).collectList();
|
||||
}
|
||||
|
||||
if (function.isConsumer()) {
|
||||
((Mono) result).subscribe();
|
||||
return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build();
|
||||
}
|
||||
else {
|
||||
result = Mono.from((Publisher) result).map(v -> {
|
||||
if (v instanceof Iterable) {
|
||||
List aggregatedResult = (List) ((Collection) v).stream().map(m -> {
|
||||
return m instanceof Message ? this.doProcessMessage(responseOkBuilder, (Message<?>) m) : m;
|
||||
}).collect(Collectors.toList());
|
||||
return Mono.just(responseOkBuilder.body(aggregatedResult));
|
||||
}
|
||||
else if (v instanceof Message) {
|
||||
return this.doProcessMessage(responseOkBuilder, (Message<?>) v);
|
||||
}
|
||||
else {
|
||||
return Mono.just(v);
|
||||
}
|
||||
});
|
||||
return result;
|
||||
}
|
||||
}
|
||||
else if (function.isConsumer()) {
|
||||
return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build();
|
||||
}
|
||||
else {
|
||||
return result instanceof Message ?
|
||||
responseOkBuilder.headers(HeaderUtils.fromMessage(((Message) result).getHeaders())).body(((Message) result).getPayload()) :
|
||||
responseOkBuilder.body(result);
|
||||
}
|
||||
}
|
||||
|
||||
private Object doProcessMessage(BodyBuilder responseOkBuilder, Message<?> message) {
|
||||
responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders()));
|
||||
return message.getPayload();
|
||||
return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false);
|
||||
}
|
||||
|
||||
private FunctionWrapper wrapper(WebRequest request) {
|
||||
|
||||
@@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebUtils;
|
||||
import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.util.StringUtils;
|
||||
@@ -91,7 +91,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping
|
||||
path = path.substring(this.prefix.length());
|
||||
}
|
||||
|
||||
Object function = FunctionWebUtils.findFunction(HttpMethod.resolve(request.getMethod()),
|
||||
Object function = FunctionWebRequestProcessingHelper.findFunction(HttpMethod.resolve(request.getMethod()),
|
||||
this.functions, new HttpRequestAttributeDelegate(request), path, new String[] {});
|
||||
if (function != null) {
|
||||
if (this.logger.isDebugEnabled()) {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2020 the original author or authors.
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -16,30 +16,41 @@
|
||||
|
||||
package org.springframework.cloud.function.web.util;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.web.constants.WebRequestConstants;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.ResponseEntity.BodyBuilder;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* !INTERNAL USE ONLY!
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
public final class FunctionWebUtils {
|
||||
public final class FunctionWebRequestProcessingHelper {
|
||||
|
||||
private FunctionWebUtils() {
|
||||
private static Log logger = LogFactory.getLog(FunctionWebRequestProcessingHelper.class);
|
||||
|
||||
private FunctionWebRequestProcessingHelper() {
|
||||
|
||||
}
|
||||
|
||||
@@ -66,6 +77,74 @@ public final class FunctionWebUtils {
|
||||
return new String[] {};
|
||||
}
|
||||
|
||||
public static Object invokeFunction(FunctionInvocationWrapper function, Object input, boolean isMessage) {
|
||||
Object result = function.apply(input);
|
||||
return postProcessResult(result, isMessage);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public static Object processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) {
|
||||
FunctionInvocationWrapper function = wrapper.getFunction();
|
||||
|
||||
HttpHeaders headers = wrapper.getHeaders();
|
||||
|
||||
Message<?> inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build();
|
||||
|
||||
if (function.isRoutingFunction()) {
|
||||
function.setSkipOutputConversion(true);
|
||||
}
|
||||
|
||||
Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage);
|
||||
|
||||
Object result = function.apply(input);
|
||||
if (function.isConsumer()) {
|
||||
if (result instanceof Publisher) {
|
||||
Mono.from((Publisher) result).subscribe();
|
||||
}
|
||||
return Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build());
|
||||
}
|
||||
|
||||
BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers));
|
||||
|
||||
Publisher pResult;
|
||||
if (result instanceof Publisher) {
|
||||
pResult = (Publisher) result;
|
||||
if (eventStream) {
|
||||
return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result)));
|
||||
}
|
||||
|
||||
if (pResult instanceof Flux) {
|
||||
pResult = ((Flux) pResult).onErrorContinue((e, v) -> {
|
||||
logger.error("Failed to process value: " + v, (Throwable) e);
|
||||
}).collectList();
|
||||
}
|
||||
pResult = Mono.from(pResult);
|
||||
}
|
||||
else {
|
||||
pResult = Mono.just(result);
|
||||
}
|
||||
|
||||
return Mono.from(pResult).map(v -> {
|
||||
if (v instanceof Iterable) {
|
||||
List aggregatedResult = (List) ((Collection) v).stream().map(m -> {
|
||||
return m instanceof Message ? processMessage(responseOkBuilder, (Message<?>) m) : m;
|
||||
}).collect(Collectors.toList());
|
||||
return responseOkBuilder.header("content-type", "application/json").body(aggregatedResult);
|
||||
}
|
||||
else if (v instanceof Message) {
|
||||
return responseOkBuilder.body(processMessage(responseOkBuilder, (Message<?>) v));
|
||||
}
|
||||
else {
|
||||
return responseOkBuilder.body(v);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static Object processMessage(BodyBuilder responseOkBuilder, Message<?> message) {
|
||||
responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders()));
|
||||
return message.getPayload();
|
||||
}
|
||||
|
||||
private static FunctionInvocationWrapper doFindFunction(HttpMethod method, FunctionCatalog functionCatalog,
|
||||
Map<String, Object> attributes, String path, String[] acceptContentTypes) {
|
||||
path = path.startsWith("/") ? path.substring(1) : path;
|
||||
@@ -100,11 +179,6 @@ public final class FunctionWebUtils {
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Object invokeFunction(FunctionInvocationWrapper function, Object input, boolean isMessage) {
|
||||
Object result = function.apply(input);
|
||||
return postProcessResult(result, isMessage);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
private static Object postProcessResult(Object result, boolean isMessage) {
|
||||
if (result instanceof Flux) {
|
||||
@@ -125,4 +199,5 @@ public final class FunctionWebUtils {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user