GH-708 Initial refactoring and consolidation of s-c-function-web MVC part.

This commit is contained in:
Oleg Zhurakousky
2021-06-02 19:56:21 +02:00
parent eddfa97b85
commit 4cc88f1124
6 changed files with 139 additions and 24 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.web.mvc;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -29,6 +30,8 @@ import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry
import org.springframework.cloud.function.web.RequestProcessor;
import org.springframework.cloud.function.web.RequestProcessor.FunctionWrapper;
import org.springframework.cloud.function.web.constants.WebRequestConstants;
import org.springframework.cloud.function.web.util.HeaderUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.ResponseEntity.BodyBuilder;
@@ -37,6 +40,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -89,15 +93,6 @@ public class FunctionController {
return this.processor.post(wrapper, null, false);
}
@PostMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> post(WebRequest request,
@RequestBody(required = false) String body) {
FunctionWrapper wrapper = wrapper(request);
Mono<ResponseEntity<?>> result = this.processor.post(wrapper, body, false);
return result;
}
@PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> postStream(WebRequest request,
@@ -108,13 +103,6 @@ public class FunctionController {
.body((Publisher<?>) response.getBody()));
}
@GetMapping(path = "/**")
@ResponseBody
public Mono<ResponseEntity<?>> get(WebRequest request) {
FunctionWrapper wrapper = wrapper(request);
return this.processor.get(wrapper);
}
@GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Mono<ResponseEntity<Publisher<?>>> getStream(WebRequest request) {
@@ -123,6 +111,79 @@ public class FunctionController {
.headers(response.getHeaders()).body((Publisher<?>) response.getBody()));
}
@PostMapping(path = "/**")
@ResponseBody
public Object post(WebRequest request, @RequestBody(required = false) String body) {
String argument = StringUtils.hasText(body) ? body : "";
return this.doProcess(request, argument);
}
@GetMapping(path = "/**")
@ResponseBody
public Object get(WebRequest request) {
String argument = (String) request.getAttribute(WebRequestConstants.ARGUMENT, WebRequest.SCOPE_REQUEST);
return this.doProcess(request, argument);
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private Object doProcess(WebRequest request, String argument) {
FunctionWrapper wrapper = wrapper(request);
FunctionInvocationWrapper function = wrapper.function();
HttpHeaders headers = wrapper.headers();
Message<?> inputMessage = argument == null ? null : MessageBuilder.withPayload(argument).copyHeaders(headers.toSingleValueMap()).build();
if (function.isRoutingFunction()) {
function.setSkipOutputConversion(true);
}
Object result = function.apply(inputMessage);
BodyBuilder responseOkBuilder = ResponseEntity.ok().headers(HeaderUtils.sanitize(headers));
if (result instanceof Publisher) {
if (result instanceof Flux) {
result = ((Flux) result).collectList();
}
if (function.isConsumer()) {
((Mono) result).subscribe();
return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build();
}
else {
result = Mono.from((Publisher) result).map(v -> {
if (v instanceof Iterable) {
List aggregatedResult = (List) ((Collection) v).stream().map(m -> {
return m instanceof Message ? this.doProcessMessage(responseOkBuilder, (Message<?>) m) : m;
}).collect(Collectors.toList());
return Mono.just(responseOkBuilder.body(aggregatedResult));
}
else if (v instanceof Message) {
return this.doProcessMessage(responseOkBuilder, (Message<?>) v);
}
else {
return Mono.just(v);
}
});
return result;
}
}
else if (function.isConsumer()) {
return ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers)).build();
}
else {
return result instanceof Message ?
responseOkBuilder.headers(HeaderUtils.fromMessage(((Message) result).getHeaders())).body(((Message) result).getPayload()) :
responseOkBuilder.body(result);
}
}
private Object doProcessMessage(BodyBuilder responseOkBuilder, Message<?> message) {
responseOkBuilder.headers(HeaderUtils.fromMessage(message.getHeaders()));
return message.getPayload();
}
private FunctionWrapper wrapper(WebRequest request) {
FunctionInvocationWrapper function = (FunctionInvocationWrapper) request
.getAttribute(WebRequestConstants.HANDLER, WebRequest.SCOPE_REQUEST);

View File

@@ -209,19 +209,19 @@ public class HttpGetIntegrationTests {
@Test
public void postMoreFoo() {
assertThat(this.rest.getForObject("/post/more/foo", String.class))
.isEqualTo("(FOO)");
.isEqualTo("[\"(FOO)\"]");
}
@Test
public void uppercaseGet() {
assertThat(this.rest.getForObject("/uppercase/foo", String.class))
.isEqualTo("(FOO)");
.isEqualTo("[\"(FOO)\"]");
}
@Test
public void convertGet() {
assertThat(this.rest.getForObject("/wrap/123", String.class))
.isEqualTo("..123..");
.isEqualTo("[\"..123..\"]");
}
@Test
@@ -235,10 +235,12 @@ public class HttpGetIntegrationTests {
assertThat(this.rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
.getBody()).isEqualTo("[{\"value\":321}]");
}
@Test
@Disabled
// this test is wrong since it is returning Flux while setting CT to TEXT_PLAIN. We can't convert it
public void compose() throws Exception {
ResponseEntity<String> result = this.rest.exchange(RequestEntity
.get(new URI("/concat,reverse/foo")).accept(MediaType.TEXT_PLAIN).build(),
@@ -338,7 +340,7 @@ public class HttpGetIntegrationTests {
public Supplier<Flux<String>> timeout() {
return () -> Flux.defer(() -> Flux.<String>create(emitter -> {
emitter.next("foo");
}).timeout(Duration.ofMillis(100L), Flux.empty()));
}).timeout(Duration.ofMillis(1000L), Flux.empty()));
}
@Bean

View File

@@ -420,7 +420,9 @@ public class HttpPostIntegrationTests {
@Bean
public Consumer<String> bareUpdates() {
return value -> this.list.add(value);
return value -> {
this.list.add(value);
};
}
@Bean("not/a")