From ddc08de2d1c9dd70a9bf9d3278fd1eab55480fac Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 17 Jun 2021 16:53:52 +0200 Subject: [PATCH] GH-708 Consolidate web request processing for Flux and MVC endpoints" --- .../cloud/function/web/RequestProcessor.java | 8 +- .../function/web/flux/FunctionController.java | 114 +++--------------- .../web/flux/FunctionHandlerMapping.java | 4 +- .../function/FunctionEndpointInitializer.java | 10 +- .../function/web/mvc/FunctionController.java | 82 ++----------- .../web/mvc/FunctionHandlerMapping.java | 4 +- ...> FunctionWebRequestProcessingHelper.java} | 93 ++++++++++++-- 7 files changed, 125 insertions(+), 190 deletions(-) rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/{FunctionWebUtils.java => FunctionWebRequestProcessingHelper.java} (60%) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index b2d18de9a..1f7a97b01 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -37,7 +37,7 @@ import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.message.MessageUtils; import org.springframework.cloud.function.json.JsonMapper; -import org.springframework.cloud.function.web.util.FunctionWebUtils; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.core.ResolvableType; import org.springframework.http.HttpHeaders; @@ -75,7 +75,7 @@ public class RequestProcessor { } else { FunctionInvocationWrapper function = (wrapper.function); - Object result = FunctionWebUtils.invokeFunction(function, null, false); + Object result = FunctionWebRequestProcessingHelper.invokeFunction(function, null, false); return response(wrapper, wrapper.function(), result instanceof Publisher ? (Publisher) result : Flux.just(result), null, true); } @@ -153,7 +153,7 @@ public class RequestProcessor { .body("Function for provided path can not be found")); } else { - Publisher result = (Publisher) FunctionWebUtils.invokeFunction(function, flux, function.isInputTypeMessage()); + Publisher result = (Publisher) FunctionWebRequestProcessingHelper.invokeFunction(function, flux, function.isInputTypeMessage()); if (function.isConsumer()) { if (result != null) { ((Mono) result).subscribe(); @@ -279,7 +279,7 @@ public class RequestProcessor { private Publisher invokeFunction(FunctionWrapper wrapper) { if (wrapper.argument != null) { Flux input = Flux.from(wrapper.argument); - Object result = FunctionWebUtils.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage()); + Object result = FunctionWebRequestProcessingHelper.invokeFunction(wrapper.function, input, wrapper.function.isInputTypeMessage()); return Mono.from((Publisher) result); } else { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index 87d5b3f73..bd3939a78 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -16,28 +16,17 @@ package org.springframework.cloud.function.web.flux; -import java.util.Collection; -import java.util.List; -import java.util.stream.Collectors; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.cloud.function.web.util.FunctionWrapper; -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; -import org.springframework.http.ResponseEntity.BodyBuilder; import org.springframework.http.codec.multipart.FormFieldPart; import org.springframework.http.codec.multipart.Part; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -50,19 +39,19 @@ import org.springframework.web.server.ServerWebExchange; /** * @author Dave Syer * @author Mark Fisher + * @author Oleg Zhurakousky */ @Component public class FunctionController { - private static Log logger = LogFactory.getLog(FunctionController.class); - @SuppressWarnings("unchecked") @PostMapping(path = "/**", consumes = MediaType.APPLICATION_FORM_URLENCODED_VALUE) @ResponseBody public Mono> form(ServerWebExchange request) { FunctionWrapper wrapper = wrapper(request); return request.getFormData().doOnSuccess(params -> wrapper.getParams().addAll(params)) - .then(Mono.defer(() -> (Mono>) this.doProcess(request, wrapper.getParams(), false))); + .then(Mono.defer(() -> (Mono>) FunctionWebRequestProcessingHelper + .processRequest(wrapper, wrapper.getParams(), false))); } @SuppressWarnings("unchecked") @@ -72,20 +61,8 @@ public class FunctionController { FunctionWrapper wrapper = wrapper(request); return request.getMultipartData() .doOnSuccess(params -> wrapper.getParams().addAll(multi(params))) - .then(Mono.defer(() -> (Mono>) this.doProcess(request, wrapper.getParams(), false))); - } - - private MultiValueMap multi(MultiValueMap body) { - MultiValueMap map = new LinkedMultiValueMap<>(); - for (String key : body.keySet()) { - for (Part part : body.get(key)) { - if (part instanceof FormFieldPart) { - FormFieldPart form = (FormFieldPart) part; - map.add(key, form.value()); - } - } - } - return map; + .then(Mono.defer(() -> (Mono>) FunctionWebRequestProcessingHelper + .processRequest(wrapper, wrapper.getParams(), false))); } @SuppressWarnings("unchecked") @@ -93,15 +70,14 @@ public class FunctionController { @ResponseBody public Mono> post(ServerWebExchange request, @RequestBody(required = false) String body) { - Mono> m = (Mono>) this.doProcess(request, body, false); - return m; + return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false); } @SuppressWarnings("unchecked") @PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody public Mono> postStream(ServerWebExchange request, @RequestBody(required = false) Flux body) { - return (Mono>) this.doProcess(request, body, false); + return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false); } @SuppressWarnings("unchecked") @@ -109,7 +85,7 @@ public class FunctionController { @ResponseBody public Mono> get(ServerWebExchange request) { FunctionWrapper wrapper = wrapper(request); - return (Mono>) this.doProcess(request, wrapper.getArgument(), false); + return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false); } @SuppressWarnings("unchecked") @@ -117,7 +93,7 @@ public class FunctionController { @ResponseBody public Mono> getStream(ServerWebExchange request) { FunctionWrapper wrapper = wrapper(request); - return (Mono>) this.doProcess(request, wrapper.getArgument(), true); + return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true); } private FunctionWrapper wrapper(ServerWebExchange request) { @@ -133,66 +109,16 @@ public class FunctionController { return wrapper; } - @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object doProcess(ServerWebExchange request, Object argument, boolean eventStream) { - FunctionWrapper wrapper = wrapper(request); - - FunctionInvocationWrapper function = wrapper.getFunction(); - - HttpHeaders headers = wrapper.getHeaders(); - - Message inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build(); - - if (function.isRoutingFunction()) { - function.setSkipOutputConversion(true); + private MultiValueMap multi(MultiValueMap body) { + MultiValueMap map = new LinkedMultiValueMap<>(); + for (String key : body.keySet()) { + for (Part part : body.get(key)) { + if (part instanceof FormFieldPart) { + FormFieldPart form = (FormFieldPart) part; + map.add(key, form.value()); + } + } } - - Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage); - - Object result = function.apply(input); - if (function.isConsumer()) { - ((Mono) result).subscribe(); - return Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build()); - } - - BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers)); - - Publisher pResult; - if (result instanceof Publisher) { - pResult = (Publisher) result; - if (eventStream) { - return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result))); - } - - if (pResult instanceof Flux) { - pResult = ((Flux) pResult).onErrorContinue((e, v) -> { - logger.error("Failed to process value: " + v, (Throwable) e); - }).collectList(); - } - pResult = Mono.from(pResult); - } - else { - pResult = Mono.just(result); - } - - return Mono.from(pResult).map(v -> { - if (v instanceof Iterable) { - List aggregatedResult = (List) ((Collection) v).stream().map(m -> { - return m instanceof Message ? this.doProcessMessage(responseOkBuilder, (Message) m) : m; - }).collect(Collectors.toList()); - return responseOkBuilder.header("content-type", "application/json").body(aggregatedResult); - } - else if (v instanceof Message) { - return responseOkBuilder.body(this.doProcessMessage(responseOkBuilder, (Message) v)); - } - else { - return responseOkBuilder.body(v); - } - }); - } - - private Object doProcessMessage(BodyBuilder responseOkBuilder, Message message) { - responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders())); - return message.getPayload(); + return map; } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java index 33d1f5a1b..3b13d660b 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionHandlerMapping.java @@ -25,7 +25,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.web.constants.WebRequestConstants; -import org.springframework.cloud.function.web.util.FunctionWebUtils; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.context.annotation.Configuration; import org.springframework.util.StringUtils; import org.springframework.web.method.HandlerMethod; @@ -79,7 +79,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping if (path.startsWith(this.prefix)) { path = path.substring(this.prefix.length()); } - Object function = FunctionWebUtils + Object function = FunctionWebRequestProcessingHelper .findFunction(request.getRequest().getMethod(), this.functions, request.getAttributes(), path, new String[] {}); if (function != null) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index 22b40c8d7..be6a40f1c 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -44,7 +44,7 @@ import org.springframework.cloud.function.json.JsonMapper; import org.springframework.cloud.function.web.RequestProcessor; import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper; import org.springframework.cloud.function.web.constants.WebRequestConstants; -import org.springframework.cloud.function.web.util.FunctionWebUtils; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextInitializer; import org.springframework.context.ApplicationEvent; @@ -222,8 +222,8 @@ class FunctionEndpointFactory { function = this.functionCatalog.lookup(Function.class, handler); } else { - String[] accept = FunctionWebUtils.acceptContentTypes(request.headers().accept()); - function = FunctionWebUtils.findFunction(request.method(), functionCatalog, request.attributes(), + String[] accept = FunctionWebRequestProcessingHelper.acceptContentTypes(request.headers().accept()); + function = FunctionWebRequestProcessingHelper.findFunction(request.method(), functionCatalog, request.attributes(), request.path(), accept); } return function; @@ -247,7 +247,7 @@ class FunctionEndpointFactory { Class outputType = FunctionTypeUtils .getRawType(FunctionTypeUtils.getGenericType(funcWrapper.getOutputType())); if (funcWrapper.isSupplier()) { - Object result = FunctionWebUtils.invokeFunction(funcWrapper, null, funcWrapper.isInputTypeMessage()); + Object result = FunctionWebRequestProcessingHelper.invokeFunction(funcWrapper, null, funcWrapper.isInputTypeMessage()); if (!(result instanceof Publisher)) { result = Mono.just(result); } @@ -258,7 +258,7 @@ class FunctionEndpointFactory { wrapper.headers(request.headers().asHttpHeaders()); String argument = (String) request.attribute(WebRequestConstants.ARGUMENT).get(); wrapper.argument(Flux.just(argument)); - Object result = FunctionWebUtils.invokeFunction(funcWrapper, wrapper.argument(), + Object result = FunctionWebRequestProcessingHelper.invokeFunction(funcWrapper, wrapper.argument(), funcWrapper.isInputTypeMessage()); return ServerResponse.ok().body(result, outputType); } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java index 8dbc3f6b2..742bc1798 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java @@ -17,7 +17,6 @@ package org.springframework.cloud.function.web.mvc; import java.util.Arrays; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -28,9 +27,8 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.cloud.function.web.util.FunctionWrapper; -import org.springframework.cloud.function.web.util.HeaderUtils; -import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity.BodyBuilder; @@ -82,7 +80,7 @@ public class FunctionController { return Mono.from(result).flatMap(body -> Mono.just(builder.body(body))); } } - return this.doProcess(request, wrapper.getParams(), false); + return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getParams(), false); } @SuppressWarnings("unchecked") @@ -91,7 +89,7 @@ public class FunctionController { public Mono>> postStream(WebRequest request, @RequestBody(required = false) String body) { String argument = StringUtils.hasText(body) ? body : ""; - return ((Mono>) this.doProcess(request, argument, true)).map(response -> ResponseEntity.ok() + return ((Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, true)).map(response -> ResponseEntity.ok() .headers(response.getHeaders()).body((Publisher) response.getBody())); } @@ -99,8 +97,9 @@ public class FunctionController { @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody public Mono>> getStream(WebRequest request) { - String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST); - return ((Mono>) this.doProcess(request, argument, true)).map(response -> ResponseEntity.ok() + FunctionWrapper wrapper = wrapper(request); + return ((Mono>) FunctionWebRequestProcessingHelper + .processRequest(wrapper, wrapper.getArgument(), true)).map(response -> ResponseEntity.ok() .headers(response.getHeaders()).body((Publisher) response.getBody())); } @@ -108,79 +107,14 @@ public class FunctionController { @ResponseBody public Object post(WebRequest request, @RequestBody(required = false) String body) { String argument = StringUtils.hasText(body) ? body : ""; - return this.doProcess(request, argument, false); + return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, false); } @GetMapping(path = "/**") @ResponseBody public Object get(WebRequest request) { - String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST); - return this.doProcess(request, argument, false); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object doProcess(WebRequest request, Object argument, boolean eventStream) { FunctionWrapper wrapper = wrapper(request); - - FunctionInvocationWrapper function = wrapper.getFunction(); - - HttpHeaders headers = wrapper.getHeaders(); - - Message inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build(); - - if (function.isRoutingFunction()) { - function.setSkipOutputConversion(true); - } - - Object result = function.apply(inputMessage); - - BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers)); - - if (result instanceof Publisher) { - Publisher p = (Publisher) result; - if (eventStream) { - return Flux.from(p).then(Mono.fromSupplier(() -> responseOkBuilder.body(p))); - } - - if (result instanceof Flux) { - result = ((Flux) result).collectList(); - } - - if (function.isConsumer()) { - ((Mono) result).subscribe(); - return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build(); - } - else { - result = Mono.from((Publisher) result).map(v -> { - if (v instanceof Iterable) { - List aggregatedResult = (List) ((Collection) v).stream().map(m -> { - return m instanceof Message ? this.doProcessMessage(responseOkBuilder, (Message) m) : m; - }).collect(Collectors.toList()); - return Mono.just(responseOkBuilder.body(aggregatedResult)); - } - else if (v instanceof Message) { - return this.doProcessMessage(responseOkBuilder, (Message) v); - } - else { - return Mono.just(v); - } - }); - return result; - } - } - else if (function.isConsumer()) { - return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build(); - } - else { - return result instanceof Message ? - responseOkBuilder.headers(HeaderUtils.fromMessage(((Message) result).getHeaders())).body(((Message) result).getPayload()) : - responseOkBuilder.body(result); - } - } - - private Object doProcessMessage(BodyBuilder responseOkBuilder, Message message) { - responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders())); - return message.getPayload(); + return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false); } private FunctionWrapper wrapper(WebRequest request) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java index 29051d4b8..8db4ac9f2 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionHandlerMapping.java @@ -26,7 +26,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.web.constants.WebRequestConstants; -import org.springframework.cloud.function.web.util.FunctionWebUtils; +import org.springframework.cloud.function.web.util.FunctionWebRequestProcessingHelper; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpMethod; import org.springframework.util.StringUtils; @@ -91,7 +91,7 @@ public class FunctionHandlerMapping extends RequestMappingHandlerMapping path = path.substring(this.prefix.length()); } - Object function = FunctionWebUtils.findFunction(HttpMethod.resolve(request.getMethod()), + Object function = FunctionWebRequestProcessingHelper.findFunction(HttpMethod.resolve(request.getMethod()), this.functions, new HttpRequestAttributeDelegate(request), path, new String[] {}); if (function != null) { if (this.logger.isDebugEnabled()) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java similarity index 60% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java index c8b76cf04..19f83fa34 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,30 +16,41 @@ package org.springframework.cloud.function.web.util; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; - +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.web.constants.WebRequestConstants; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.http.ResponseEntity.BodyBuilder; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; /** + * !INTERNAL USE ONLY! + * * @author Oleg Zhurakousky * */ -public final class FunctionWebUtils { +public final class FunctionWebRequestProcessingHelper { - private FunctionWebUtils() { + private static Log logger = LogFactory.getLog(FunctionWebRequestProcessingHelper.class); + + private FunctionWebRequestProcessingHelper() { } @@ -66,6 +77,74 @@ public final class FunctionWebUtils { return new String[] {}; } + public static Object invokeFunction(FunctionInvocationWrapper function, Object input, boolean isMessage) { + Object result = function.apply(input); + return postProcessResult(result, isMessage); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Object processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) { + FunctionInvocationWrapper function = wrapper.getFunction(); + + HttpHeaders headers = wrapper.getHeaders(); + + Message inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build(); + + if (function.isRoutingFunction()) { + function.setSkipOutputConversion(true); + } + + Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage); + + Object result = function.apply(input); + if (function.isConsumer()) { + if (result instanceof Publisher) { + Mono.from((Publisher) result).subscribe(); + } + return Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build()); + } + + BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers)); + + Publisher pResult; + if (result instanceof Publisher) { + pResult = (Publisher) result; + if (eventStream) { + return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result))); + } + + if (pResult instanceof Flux) { + pResult = ((Flux) pResult).onErrorContinue((e, v) -> { + logger.error("Failed to process value: " + v, (Throwable) e); + }).collectList(); + } + pResult = Mono.from(pResult); + } + else { + pResult = Mono.just(result); + } + + return Mono.from(pResult).map(v -> { + if (v instanceof Iterable) { + List aggregatedResult = (List) ((Collection) v).stream().map(m -> { + return m instanceof Message ? processMessage(responseOkBuilder, (Message) m) : m; + }).collect(Collectors.toList()); + return responseOkBuilder.header("content-type", "application/json").body(aggregatedResult); + } + else if (v instanceof Message) { + return responseOkBuilder.body(processMessage(responseOkBuilder, (Message) v)); + } + else { + return responseOkBuilder.body(v); + } + }); + } + + private static Object processMessage(BodyBuilder responseOkBuilder, Message message) { + responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders())); + return message.getPayload(); + } + private static FunctionInvocationWrapper doFindFunction(HttpMethod method, FunctionCatalog functionCatalog, Map attributes, String path, String[] acceptContentTypes) { path = path.startsWith("/") ? path.substring(1) : path; @@ -100,11 +179,6 @@ public final class FunctionWebUtils { return null; } - public static Object invokeFunction(FunctionInvocationWrapper function, Object input, boolean isMessage) { - Object result = function.apply(input); - return postProcessResult(result, isMessage); - } - @SuppressWarnings({ "unchecked", "rawtypes" }) private static Object postProcessResult(Object result, boolean isMessage) { if (result instanceof Flux) { @@ -125,4 +199,5 @@ public final class FunctionWebUtils { } return result; } + }