GH-783 Fix Consumer processing with webflux

Resolves #783
This commit is contained in:
Oleg Zhurakousky
2021-12-13 16:08:21 +01:00
parent 7e5b263ef6
commit fedae1cb5f
2 changed files with 38 additions and 2 deletions

View File

@@ -64,6 +64,7 @@ import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.ServerResponse.BodyBuilder;
import org.springframework.web.server.WebExceptionHandler;
import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@@ -244,8 +245,13 @@ class FunctionEndpointFactory {
Mono<ResponseEntity<?>> stream = request.bodyToMono(String.class)
.flatMap(content -> this.processor.post(wrapper, content, false));
return stream.flatMap(entity -> {
return status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders()))
.body(entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType);
BodyBuilder builder = status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders()));
if (outputType == null) { // consumer
return builder.build();
}
else {
return builder.body(entity != null && entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType);
}
});
}).andRoute(GET("/**"), request -> {
FunctionInvocationWrapper funcWrapper = extract(request);