From 4699f69be51275c15ad4733173ec71acab8db601 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 24 Jan 2022 15:36:24 +0100 Subject: [PATCH] GH-792 Fix Supplier streaming in s-c-function-web Resolves #792 --- .../function-sample/pom.xml | 21 +++------ .../src/main/java/com/example/Client.java | 44 +++++++++++++++++++ .../java/com/example/SampleApplication.java | 10 +++-- .../test/java/com/example/FunctionTests.java | 9 ---- .../function/web/flux/FunctionController.java | 21 +++++---- .../function/web/mvc/FunctionController.java | 11 ++--- .../FunctionWebRequestProcessingHelper.java | 6 +-- .../web/flux/HttpPostIntegrationTests.java | 2 + .../web/function/UserSubmittedTests.java | 12 +++++ .../web/mvc/HttpPostIntegrationTests.java | 24 ++++++++-- 10 files changed, 109 insertions(+), 51 deletions(-) create mode 100644 spring-cloud-function-samples/function-sample/src/main/java/com/example/Client.java diff --git a/spring-cloud-function-samples/function-sample/pom.xml b/spring-cloud-function-samples/function-sample/pom.xml index b254a9ca4..c3981181e 100644 --- a/spring-cloud-function-samples/function-sample/pom.xml +++ b/spring-cloud-function-samples/function-sample/pom.xml @@ -1,7 +1,7 @@ + xmlns="http://maven.apache.org/POM/4.0.0" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 io.spring.sample @@ -14,29 +14,20 @@ org.springframework.boot spring-boot-starter-parent -<<<<<<< HEAD - 2.6.1 -======= 3.0.0-SNAPSHOT ->>>>>>> 4.x - + -<<<<<<< HEAD - 1.8 - 3.2.2-SNAPSHOT -======= 4.0.0-SNAPSHOT ->>>>>>> 4.x 1.0.27.RELEASE - org.springframework.boot - spring-boot-starter-actuator - + org.springframework.boot + spring-boot-starter-actuator + org.springframework.cloud spring-cloud-starter-function-webflux diff --git a/spring-cloud-function-samples/function-sample/src/main/java/com/example/Client.java b/spring-cloud-function-samples/function-sample/src/main/java/com/example/Client.java new file mode 100644 index 000000000..516fa3a1c --- /dev/null +++ b/spring-cloud-function-samples/function-sample/src/main/java/com/example/Client.java @@ -0,0 +1,44 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example; + +import org.springframework.web.reactive.function.client.WebClient; + +/** + * Sample client to test infinite stream from function. + * + * @author Oleg Zhurakousky + * + */ +public class Client { + + public static void main(String[] args) throws Exception { + WebClient client = WebClient.create(); + WebClient.ResponseSpec responseSpec = client.post() + .uri("http://localhost:8080/infinite") + .header("accept", "text/event-stream") + .retrieve(); + + responseSpec.bodyToFlux(String.class).subscribe(v -> { + System.out.println(v); + }); + + System.in.read(); + + } + +} diff --git a/spring-cloud-function-samples/function-sample/src/main/java/com/example/SampleApplication.java b/spring-cloud-function-samples/function-sample/src/main/java/com/example/SampleApplication.java index 4fb75cf8c..f26aec07f 100644 --- a/spring-cloud-function-samples/function-sample/src/main/java/com/example/SampleApplication.java +++ b/spring-cloud-function-samples/function-sample/src/main/java/com/example/SampleApplication.java @@ -16,10 +16,12 @@ package com.example; +import java.time.Duration; import java.util.function.Function; import java.util.function.Supplier; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -55,9 +57,11 @@ public class SampleApplication { } @Bean - public Supplier> words() { - return () -> Flux.fromArray(new String[] {"foo", "bar"}); + public Supplier> infinite() { + return () -> Flux + .interval(Duration.ofSeconds(1)) + .log() + .map(counter -> String.format("Counter: %s", counter)); } } -// @checkstyle:on diff --git a/spring-cloud-function-samples/function-sample/src/test/java/com/example/FunctionTests.java b/spring-cloud-function-samples/function-sample/src/test/java/com/example/FunctionTests.java index 9248f1030..cae8e9187 100644 --- a/spring-cloud-function-samples/function-sample/src/test/java/com/example/FunctionTests.java +++ b/spring-cloud-function-samples/function-sample/src/test/java/com/example/FunctionTests.java @@ -51,15 +51,6 @@ public class FunctionTests { assertThat(output).isEqualTo("hello"); } - @Test - public void testWords() { - Flux output = this.functions.words().get(); - List results = output.collectList().block(); - assertThat(results.size()).isEqualTo(2); - assertThat(results.get(0)).isEqualTo("foo"); - assertThat(results.get(1)).isEqualTo("bar"); - } - @Test public void testGreeter() { assertThat(new Greeter().apply("World")).isEqualTo("Hello World"); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java index b04c95212..97b710ef7 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FunctionController.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.web.flux; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -74,11 +75,17 @@ public class FunctionController { return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false); } - @SuppressWarnings("unchecked") @PostMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody - public Mono> postStream(ServerWebExchange request, @RequestBody(required = false) Flux body) { - return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false); + public Publisher postStream(ServerWebExchange request, @RequestBody(required = false) Flux body) { + return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, true); + } + + @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @ResponseBody + public Publisher getStream(ServerWebExchange request) { + FunctionWrapper wrapper = wrapper(request); + return FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true); } @SuppressWarnings("unchecked") @@ -89,14 +96,6 @@ public class FunctionController { return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), false); } - @SuppressWarnings("unchecked") - @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) - @ResponseBody - public Mono> getStream(ServerWebExchange request) { - FunctionWrapper wrapper = wrapper(request); - return (Mono>) FunctionWebRequestProcessingHelper.processRequest(wrapper, wrapper.getArgument(), true); - } - private FunctionWrapper wrapper(ServerWebExchange request) { FunctionInvocationWrapper function = (FunctionInvocationWrapper) request .getAttribute(WebRequestConstants.HANDLER); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java index 0d08f3e91..f8de9c1ca 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/mvc/FunctionController.java @@ -94,21 +94,18 @@ public class FunctionController { .headers(response.getHeaders()).body((Publisher) response.getBody())); } - @SuppressWarnings("unchecked") @GetMapping(path = "/**", produces = MediaType.TEXT_EVENT_STREAM_VALUE) @ResponseBody - public Mono>> getStream(WebRequest request) { + public Publisher getStream(WebRequest request) { FunctionWrapper wrapper = wrapper(request); - return ((Mono>) FunctionWebRequestProcessingHelper - .processRequest(wrapper, wrapper.getArgument(), true)).map(response -> ResponseEntity.ok() - .headers(response.getHeaders()).body((Publisher) response.getBody())); + return FunctionWebRequestProcessingHelper + .processRequest(wrapper, wrapper.getArgument(), true); } @PostMapping(path = "/**") @ResponseBody public Object post(WebRequest request, @RequestBody(required = false) String body) { - String argument = StringUtils.hasText(body) ? body : ""; - return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), argument, false); + return FunctionWebRequestProcessingHelper.processRequest(wrapper(request), body, false); } @GetMapping(path = "/**") diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java index b78037908..7f3638a05 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebRequestProcessingHelper.java @@ -84,7 +84,7 @@ public final class FunctionWebRequestProcessingHelper { } @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Object processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) { + public static Publisher processRequest(FunctionWrapper wrapper, Object argument, boolean eventStream) { FunctionInvocationWrapper function = wrapper.getFunction(); if (function == null) { @@ -99,7 +99,7 @@ public final class FunctionWebRequestProcessingHelper { function.setSkipOutputConversion(true); } - Object input = argument == null ? Flux.empty() : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage); + Object input = argument == null ? "" : (argument instanceof Publisher ? Flux.from((Publisher) argument) : inputMessage); Object result = function.apply(input); if (function.isConsumer()) { @@ -115,7 +115,7 @@ public final class FunctionWebRequestProcessingHelper { if (result instanceof Publisher) { pResult = (Publisher) result; if (eventStream) { - return Flux.from(pResult).then(Mono.fromSupplier(() -> responseOkBuilder.body(result))); + return Flux.from(pResult); } if (pResult instanceof Flux) { diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java index 1cc7b8a0a..37d1680ad 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/flux/HttpPostIntegrationTests.java @@ -333,6 +333,8 @@ public class HttpPostIntegrationTests { @Test @DirtiesContext public void uppercaseSSE() throws Exception { + String s = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody(); assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) .body("[\"foo\",\"bar\"]"), String.class).getBody()) .isEqualTo(sse("(FOO)", "(BAR)")); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/UserSubmittedTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/UserSubmittedTests.java index ac30ea91c..f524ba30f 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/UserSubmittedTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/function/UserSubmittedTests.java @@ -64,6 +64,18 @@ public class UserSubmittedTests { assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); } + @Test + public void testIssue274WithData() throws Exception { + SpringApplication.run(Issue274Configuration.class); + TestRestTemplate testRestTemplate = new TestRestTemplate(); + String port = System.getProperty("server.port"); + Thread.sleep(200); + ResponseEntity response = testRestTemplate + .postForEntity(new URI("http://localhost:" + port + "/echo"), "hello", String.class); + assertThat(response.getBody()).isEqualTo("HELLO"); + assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK); + } + @SpringBootApplication protected static class Issue274Configuration { diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java index 2698bf70f..811d898da 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/mvc/HttpPostIntegrationTests.java @@ -290,12 +290,29 @@ public class HttpPostIntegrationTests { @Test public void uppercaseSSE() throws Exception { - assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")) - .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) + String s = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) + .body("[\"foo\",\"bar\"]"), String.class).getBody(); + assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) .body("[\"foo\",\"bar\"]"), String.class).getBody()) .isEqualTo(sse("(FOO)", "(BAR)")); } +// @Test +// public void uppercaseSSE() throws Exception { +// assertThat(this.rest.exchange(RequestEntity.post(new URI("/uppercase")) +// .accept(EVENT_STREAM).contentType(MediaType.APPLICATION_JSON) +// .body("[\"foo\",\"bar\"]"), String.class).getBody()) +// .isEqualTo(sse("(FOO)", "(BAR)")); +// +//// String body = this.rest.exchange(RequestEntity.post(new URI("/uppercase")).contentType(MediaType.APPLICATION_JSON) +//// .body("[\"foo\",\"bar\"]"), String.class).getBody(); +// +//// System.out.println(body); +// +//// assertThat(body) +//// .isEqualTo(sse("(FOO)", "(BAR)")); +// } + @Test public void sum() throws Exception { @@ -334,7 +351,8 @@ public class HttpPostIntegrationTests { } private String sse(String... values) { - return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + //return "data:" + StringUtils.arrayToDelimitedString(values, "\n\ndata:") + "\n\n"; + return "[\"" + StringUtils.arrayToDelimitedString(values, "\",\"") + "\"]"; } @EnableAutoConfiguration