Tweak SSE emitter a bit so that it handles empty responses

An empty response should either be empty or complete JSON. I went
for the latter.
This commit is contained in:
Dave Syer
2017-03-08 15:02:35 +00:00
parent 0c4dcfaf72
commit ec097a563d
2 changed files with 70 additions and 50 deletions

View File

@@ -56,15 +56,6 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
@Override
public void onSubscribe(Subscription subscription) {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
try {
this.responseBodyEmitter.send("[");
}
catch (IOException e) {
// Urgh?
}
}
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@@ -78,6 +69,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
this.firstElementWritten = true;
}
else {
@@ -101,7 +93,21 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
@Override
public void onError(Throwable e) {
responseBodyEmitter.completeWithError(e);
if (!completed) {
completed = true;
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (this.firstElementWritten) {
responseBodyEmitter.send("]");
}
}
responseBodyEmitter.completeWithError(e);
}
catch (IOException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
}
@Override
@@ -112,7 +118,7 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
this.firstElementWritten = true;
responseBodyEmitter.send("[");
}
responseBodyEmitter.send("]");
}

View File

@@ -24,9 +24,11 @@ import java.util.Map;
import java.util.function.Function;
import java.util.function.Supplier;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.embedded.LocalServerPort;
import org.springframework.boot.test.context.SpringBootTest;
@@ -54,66 +56,70 @@ public class RestApplicationTests {
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
@LocalServerPort
private int port;
private TestRestTemplate rest = new TestRestTemplate();
@Autowired
private TestRestTemplate rest;
@Test
public void wordsSSE() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/words"))
.accept(EVENT_STREAM).build(),
RequestEntity.get(new URI("/words")).accept(EVENT_STREAM).build(),
String.class).getBody()).isEqualTo(sse("foo", "bar"));
}
@Test
public void wordsJson() throws Exception {
assertThat(rest
.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/words"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class)
.exchange(RequestEntity.get(new URI("/words"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\",\"bar\"]");
}
@Test
@Ignore("Fix error handling")
public void errorJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/bang"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void words() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/words")).build(),
String.class).getBody()).isEqualTo("foobar");
assertThat(
rest.exchange(RequestEntity.get(new URI("/words")).build(), String.class)
.getBody()).isEqualTo("foobar");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/empty"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class)
.exchange(RequestEntity.get(new URI("/empty"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[]");
}
@Test
public void sentences() throws Exception {
assertThat(rest.exchange(RequestEntity
.get(new URI("http://localhost:" + port + "/sentences")).build(),
String.class).getBody())
.isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]");
assertThat(rest
.exchange(RequestEntity.get(new URI("/sentences")).build(), String.class)
.getBody()).isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]");
}
@Test
public void sentencesAcceptAny() throws Exception {
assertThat(rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/sentences"))
.accept(MediaType.ALL).build(),
RequestEntity.get(new URI("/sentences")).accept(MediaType.ALL).build(),
String.class).getBody())
.isEqualTo("[\"go\",\"home\"][\"come\",\"back\"]");
}
@Test
public void sentencesAcceptJson() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
ResponseEntity<String> result = rest
.exchange(
RequestEntity.get(new URI("/sentences"))
.accept(MediaType.APPLICATION_JSON).build(),
String.class);
assertThat(result.getBody()).isEqualTo("[[\"go\",\"home\"],[\"come\",\"back\"]]");
assertThat(result.getHeaders().getContentType())
.isEqualTo(MediaType.APPLICATION_JSON);
@@ -122,8 +128,7 @@ public class RestApplicationTests {
@Test
public void sentencesAcceptSse() throws Exception {
ResponseEntity<String> result = rest.exchange(
RequestEntity.get(new URI("http://localhost:" + port + "/sentences"))
.accept(EVENT_STREAM).build(),
RequestEntity.get(new URI("/sentences")).accept(EVENT_STREAM).build(),
String.class);
assertThat(result.getBody())
.isEqualTo(sse("[\"go\",\"home\"]", "[\"come\",\"back\"]"));
@@ -133,14 +138,14 @@ public class RestApplicationTests {
@Test
public void uppercase() {
assertThat(rest.postForObject("http://localhost:" + port + "/uppercase",
"foo\nbar", String.class)).isEqualTo("[FOO][BAR]");
assertThat(rest.postForObject("/uppercase", "foo\nbar", String.class))
.isEqualTo("[FOO][BAR]");
}
@Test
public void uppercaseJsonArray() throws Exception {
assertThat(rest.exchange(
RequestEntity.post(new URI("http://localhost:" + port + "/maps"))
RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
// The new line in the middle is optional
.body("[{\"value\":\"foo\"},\n{\"value\":\"bar\"}]"),
@@ -151,21 +156,20 @@ public class RestApplicationTests {
@Test
public void uppercaseJsonStream() throws Exception {
assertThat(rest
.exchange(
RequestEntity.post(new URI("http://localhost:" + port + "/maps"))
.contentType(MediaType.APPLICATION_JSON)
// TODO: make this work without newline separator
.body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"),
String.class)
.exchange(RequestEntity.post(new URI("/maps"))
.contentType(MediaType.APPLICATION_JSON)
// TODO: make this work without newline separator
.body("{\"value\":\"foo\"}\n{\"value\":\"bar\"}"), String.class)
.getBody()).isEqualTo("{\"value\":\"FOO\"}{\"value\":\"BAR\"}");
}
@Test
public void uppercaseSSE() throws Exception {
assertThat(rest.exchange(RequestEntity
.post(new URI("http://localhost:" + port + "/uppercase"))
.accept(EVENT_STREAM).contentType(EVENT_STREAM).body(sse("foo", "bar")),
String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]"));
assertThat(
rest.exchange(
RequestEntity.post(new URI("/uppercase")).accept(EVENT_STREAM)
.contentType(EVENT_STREAM).body(sse("foo", "bar")),
String.class).getBody()).isEqualTo(sse("[FOO]", "[BAR]"));
}
private String sse(String... values) {
@@ -194,6 +198,16 @@ public class RestApplicationTests {
return () -> Flux.fromArray(new String[] { "foo", "bar" });
}
@Bean
public Supplier<Flux<String>> bang() {
return () -> Flux.fromArray(new String[] { "foo", "bar" }).map(value -> {
if (value.equals("bar")) {
throw new RuntimeException("Bar");
}
return value;
});
}
@Bean
public Supplier<Flux<String>> empty() {
return () -> Flux.fromIterable(Collections.emptyList());