GH-783 Fix Consumer processing with webflux

Resolves #783
This commit is contained in:
Oleg Zhurakousky
2021-12-13 16:08:21 +01:00
parent a86555a47e
commit a843b05d38
2 changed files with 38 additions and 2 deletions

View File

@@ -64,6 +64,7 @@ import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.ServerResponse.BodyBuilder;
import org.springframework.web.server.WebExceptionHandler;
import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
@@ -244,8 +245,13 @@ class FunctionEndpointFactory {
Mono<ResponseEntity<?>> stream = request.bodyToMono(String.class)
.flatMap(content -> this.processor.post(wrapper, content, false));
return stream.flatMap(entity -> {
return status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders()))
.body(entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType);
BodyBuilder builder = status(entity.getStatusCode()).headers(headers -> headers.addAll(entity.getHeaders()));
if (outputType == null) { // consumer
return builder.build();
}
else {
return builder.body(entity != null && entity.hasBody() ? Mono.just((T) entity.getBody()) : Mono.empty(), outputType);
}
});
}).andRoute(GET("/**"), request -> {
FunctionInvocationWrapper funcWrapper = extract(request);

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.web.function;
import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -67,6 +68,18 @@ public class FunctionEndpointInitializerTests {
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.NOT_FOUND);
}
@Test
public void testConsumerMapping() throws Exception {
FunctionalSpringApplication.run(ConsumerConfiguration.class);
TestRestTemplate testRestTemplate = new TestRestTemplate();
String port = System.getProperty("server.port");
Thread.sleep(200);
ResponseEntity<String> response = testRestTemplate
.postForEntity(new URI("http://localhost:" + port + "/uppercase"), "stressed", String.class);
assertThat(response.getBody()).isNull();
assertThat(response.getStatusCode()).isEqualTo(HttpStatus.ACCEPTED);
}
@Test
public void testSingleFunctionMapping() throws Exception {
FunctionalSpringApplication.run(ApplicationConfiguration.class);
@@ -114,6 +127,23 @@ public class FunctionEndpointInitializerTests {
assertThat(response.getBody()).isEqualTo("Jim Lahey");
}
@SpringBootConfiguration
protected static class ConsumerConfiguration
implements ApplicationContextInitializer<GenericApplicationContext> {
public Consumer<String> consume() {
return v -> System.out.println(v);
}
@Override
public void initialize(GenericApplicationContext applicationContext) {
applicationContext.registerBean("consume", FunctionRegistration.class,
() -> new FunctionRegistration<>(consume())
.type(FunctionType.consumer(String.class)));
}
}
@SpringBootConfiguration
protected static class ApplicationConfiguration