diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java index 46e2cf04b..fcff83d42 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -56,15 +56,6 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { @Override public void onSubscribe(Subscription subscription) { - if (!MediaType.ALL.equals(mediaType) - && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { - try { - this.responseBodyEmitter.send("["); - } - catch (IOException e) { - // Urgh? - } - } this.subscription = subscription; subscription.request(Long.MAX_VALUE); } @@ -78,6 +69,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { if (!this.firstElementWritten) { + responseBodyEmitter.send("["); this.firstElementWritten = true; } else { @@ -101,7 +93,21 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { @Override public void onError(Throwable e) { - responseBodyEmitter.completeWithError(e); + if (!completed) { + completed = true; + try { + if (!MediaType.ALL.equals(mediaType) + && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { + if (this.firstElementWritten) { + responseBodyEmitter.send("]"); + } + } + responseBodyEmitter.completeWithError(e); + } + catch (IOException ex) { + throw new RuntimeException(ex.getMessage(), ex); + } + } } @Override @@ -112,7 +118,7 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { if (!this.firstElementWritten) { - this.firstElementWritten = true; + responseBodyEmitter.send("["); } responseBodyEmitter.send("]"); } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index 31c35c213..e50101e32 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -24,9 +24,11 @@ import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.embedded.LocalServerPort; import org.springframework.boot.test.context.SpringBootTest; @@ -54,66 +56,70 @@ public class RestApplicationTests { private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); @LocalServerPort private int port; - private TestRestTemplate rest = new TestRestTemplate(); + @Autowired + private TestRestTemplate rest; @Test public void wordsSSE() throws Exception { assertThat(rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/words")) - .accept(EVENT_STREAM).build(), + RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(), String.class).getBody()).isEqualTo(sse("foo", "bar")); } @Test public void wordsJson() throws Exception { assertThat(rest - .exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/words")) - .accept(MediaType.APPLICATION_JSON).build(), - String.class) + .exchange(RequestEntity.get(new URI("/words")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) .getBody()).isEqualTo("[\"foo\",\"bar\"]"); } + @Test + @Ignore("Fix error handling") + public void errorJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/bang")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + @Test public void words() throws Exception { - assertThat(rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/words")).build(), - String.class).getBody()).isEqualTo("foobar"); + assertThat( + rest.exchange(RequestEntity.get(new URI("/words")).build(), String.class) + .getBody()).isEqualTo("foobar"); } @Test public void emptyJson() throws Exception { assertThat(rest - .exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/empty")) - .accept(MediaType.APPLICATION_JSON).build(), - String.class) + .exchange(RequestEntity.get(new URI("/empty")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) .getBody()).isEqualTo("[]"); } @Test public void sentences() throws Exception { - assertThat(rest.exchange(RequestEntity - .get(new URI("http://localhost:" + port + "/sentences")).build(), - String.class).getBody()) - .isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]"); + assertThat(rest + .exchange(RequestEntity.get(new URI("/sentences")).build(), String.class) + .getBody()).isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]"); } @Test public void sentencesAcceptAny() throws Exception { assertThat(rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/sentences")) - .accept(MediaType.ALL).build(), + RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(), String.class).getBody()) .isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]"); } @Test public void sentencesAcceptJson() throws Exception { - ResponseEntity result = rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/sentences")) - .accept(MediaType.APPLICATION_JSON).build(), - String.class); + ResponseEntity result = rest + .exchange( + RequestEntity.get(new URI("/sentences")) + .accept(MediaType.APPLICATION_JSON).build(), + String.class); assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]"); assertThat(result.getHeaders().getContentType()) .isEqualTo(MediaType.APPLICATION_JSON); @@ -122,8 +128,7 @@ public class RestApplicationTests { @Test public void sentencesAcceptSse() throws Exception { ResponseEntity result = rest.exchange( - RequestEntity.get(new URI("http://localhost:" + port + "/sentences")) - .accept(EVENT_STREAM).build(), + RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(), String.class); assertThat(result.getBody()) .isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]")); @@ -133,14 +138,14 @@ public class RestApplicationTests { @Test public void uppercase() { - assertThat(rest.postForObject("http://localhost:" + port + "/uppercase", - "foo\nbar", String.class)).isEqualTo("[FOO][BAR]"); + assertThat(rest.postForObject("/uppercase", "foo\nbar", String.class)) + .isEqualTo("[FOO][BAR]"); } @Test public void uppercaseJsonArray() throws Exception { assertThat(rest.exchange( - RequestEntity.post(new URI("http://localhost:" + port + "/maps")) + RequestEntity.post(new URI("/maps")) .contentType(MediaType.APPLICATION_JSON) // The new line in the middle is optional .body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"), @@ -151,21 +156,20 @@ public class RestApplicationTests { @Test public void uppercaseJsonStream() throws Exception { assertThat(rest - .exchange( - RequestEntity.post(new URI("http://localhost:" + port + "/maps")) - .contentType(MediaType.APPLICATION_JSON) - // TODO: make this work without newline separator - .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), - String.class) + .exchange(RequestEntity.post(new URI("/maps")) + .contentType(MediaType.APPLICATION_JSON) + // TODO: make this work without newline separator + .body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class) .getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}"); } @Test public void uppercaseSSE() throws Exception { - assertThat(rest.exchange(RequestEntity - .post(new URI("http://localhost:" + port + "/uppercase")) - .accept(EVENT_STREAM).contentType(EVENT_STREAM).body(sse("foo", "bar")), - String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); + assertThat( + rest.exchange( + RequestEntity.post(new URI("/uppercase")).accept(EVENT_STREAM) + .contentType(EVENT_STREAM).body(sse("foo", "bar")), + String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]")); } private String sse(String... values) { @@ -194,6 +198,16 @@ public class RestApplicationTests { return () -> Flux.fromArray(new String[] { "foo", "bar" }); } + @Bean + public Supplier> bang() { + return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> { + if (value.equals("bar")) { + throw new RuntimeException("Bar"); + } + return value; + }); + } + @Bean public Supplier> empty() { return () -> Flux.fromIterable(Collections.emptyList());