From fedae1cb5f6bbd4000e5cacf8286c7faef79a4f7 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 13 Dec 2021 16:08:21 +0100 Subject: [PATCH] GH-783 Fix Consumer processing with webflux Resolves #783 --- .../function/FunctionEndpointInitializer.java | 10 +++++-- .../FunctionEndpointInitializerTests.java | 30 +++++++++++++++++++ 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java index 9075a0a42..899b92c15 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializer.java @@ -64,6 +64,7 @@ import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; +import org.springframework.web.reactive.function.server.ServerResponse.BodyBuilder; import org.springframework.web.server.WebExceptionHandler; import org.springframework.web.server.adapter.HttpWebHandlerAdapter; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; @@ -244,8 +245,13 @@ class FunctionEndpointFactory { Mono> stream = request.bodyToMono(String.class) .flatMap(content -> this.processor.post(wrapper, content, false)); return stream.flatMap(entity -> { - return status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders())) - .body(entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType); + BodyBuilder builder = status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders())); + if (outputType == null) { // consumer + return builder.build(); + } + else { + return builder.body(entity != null && entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType); + } }); }).andRoute(GET("/**"), request -> { FunctionInvocationWrapper funcWrapper = extract(request); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializerTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializerTests.java index cc6295ff0..721ae6e23 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializerTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/FunctionEndpointInitializerTests.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.web.function; import java.net.URI; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -67,6 +68,18 @@ public class FunctionEndpointInitializerTests { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND); } + @Test + public void testConsumerMapping() throws Exception { + FunctionalSpringApplication.run(ConsumerConfiguration.class); + TestRestTemplate testRestTemplate = new TestRestTemplate(); + String port = System.getProperty("server.port"); + Thread.sleep(200); + ResponseEntity response = testRestTemplate + .postForEntity(new URI("http://localhost:" + port + "/uppercase"), "stressed", String.class); + assertThat(response.getBody()).isNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED); + } + @Test public void testSingleFunctionMapping() throws Exception { FunctionalSpringApplication.run(ApplicationConfiguration.class); @@ -114,6 +127,23 @@ public class FunctionEndpointInitializerTests { assertThat(response.getBody()).isEqualTo("Jim Lahey"); } + @SpringBootConfiguration + protected static class ConsumerConfiguration + implements ApplicationContextInitializer { + + public Consumer consume() { + return v -> System.out.println(v); + } + + @Override + public void initialize(GenericApplicationContext applicationContext) { + applicationContext.registerBean("consume", FunctionRegistration.class, + () -> new FunctionRegistration<>(consume()) + .type(FunctionType.consumer(String.class))); + } + + } + @SpringBootConfiguration protected static class ApplicationConfiguration