If HTTP client asks for JSON, then time out the response

An HTTP response does not have to be an infinite stream, and in fact
life is simpler if it is not. The timeout in the web wrappers can
be used to close the response and return normally to a client
that has been waiting more than (say) 1s, instead of treating
it as an error condition.

Error handling is still kind of unsolved.
This commit is contained in:
Dave Syer
2017-03-09 16:10:08 +00:00
parent ec097a563d
commit 17b644f563
6 changed files with 74 additions and 17 deletions

View File

@@ -70,6 +70,7 @@ public class FunctionController {
}
@GetMapping(path = "/{name}")
@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<String> supplier(@PathVariable String name) {
Supplier<Object> supplier = functions.lookupSupplier(name);
if (!FunctionUtils.isFluxSupplier(supplier)) {

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.function.web.flux;
import java.time.Duration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.server.ServerHttpResponse;
@@ -38,9 +40,10 @@ class FluxResponseBodyEmitter<T> extends ResponseBodyEmitter {
public FluxResponseBodyEmitter(Long timeout, MediaType mediaType,
Flux<T> observable) {
super(timeout);
super();
this.mediaType = mediaType;
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
new ResponseBodyEmitterSubscriber<>(mediaType,
observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this);
}
@Override
@@ -48,7 +51,8 @@ class FluxResponseBodyEmitter<T> extends ResponseBodyEmitter {
super.extendResponse(outputMessage);
HttpHeaders headers = outputMessage.getHeaders();
if (headers.getContentType() == null && this.mediaType!=null && !MediaType.ALL.equals(this.mediaType)) {
if (headers.getContentType() == null && this.mediaType != null
&& !MediaType.ALL.equals(this.mediaType)) {
headers.setContentType(this.mediaType);
}
}

View File

@@ -16,6 +16,8 @@
package org.springframework.cloud.function.web.flux;
import java.time.Duration;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -35,8 +37,9 @@ class FluxResponseSseEmitter<T> extends SseEmitter {
}
public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux<T> observable) {
super(timeout);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
super();
new ResponseBodyEmitterSubscriber<>(mediaType,
observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this);
}
}

View File

@@ -40,12 +40,23 @@ import reactor.core.publisher.Flux;
public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {
private ResponseBodyEmitterReturnValueHandler delegate;
private long timeout = 1000L;
private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream");
public FluxReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters);
}
/**
* Timeout for clients. If no items are seen on an HTTP response in this period then
* the response is closed.
*
* @param timeout the timeout to set
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
return returnValue != null && supportsReturnType(returnType);
@@ -79,12 +90,14 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
MediaType mediaType = webRequest.getHeader("Accept") == null ? null
: MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator()
.next();
delegate.handleReturnValue(getEmitter(1000L, flux, mediaType),
returnType, mavContainer, webRequest);
delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType,
mavContainer, webRequest);
}
private ResponseBodyEmitter getEmitter(Long timeout, Flux<?> flux, MediaType mediaType) {
if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) {
private ResponseBodyEmitter getEmitter(Long timeout, Flux<?> flux,
MediaType mediaType) {
if (!MediaType.ALL.equals(mediaType)
&& EVENT_STREAM.isCompatibleWith(mediaType)) {
return new FluxResponseSseEmitter<>(timeout, mediaType, flux);
}
return new FluxResponseBodyEmitter<>(timeout, mediaType, flux);

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.function.web.flux;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -32,7 +33,7 @@ import reactor.core.publisher.Flux;
*
* @author Dave Syer
*/
class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
private final MediaType mediaType;
@@ -49,8 +50,8 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(this);
this.responseBodyEmitter.onCompletion(this);
this.responseBodyEmitter.onTimeout(new Timeout());
this.responseBodyEmitter.onCompletion(new Complete());
observable.subscribe(this);
}
@@ -98,11 +99,19 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (this.firstElementWritten) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[]");
}
else {
responseBodyEmitter.send("]");
}
}
responseBodyEmitter.completeWithError(e);
if (e instanceof TimeoutException) {
responseBodyEmitter.complete();
}
else {
responseBodyEmitter.completeWithError(e);
}
}
catch (IOException ex) {
throw new RuntimeException(ex.getMessage(), ex);
@@ -130,8 +139,20 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T>, Runnable {
}
}
@Override
public void run() {
this.subscription.cancel();
class Complete implements Runnable {
@Override
public void run() {
ResponseBodyEmitterSubscriber.this.subscription.cancel();
}
}
class Timeout implements Runnable {
@Override
public void run() {
onComplete();
ResponseBodyEmitterSubscriber.this.subscription.cancel();
}
}
}

View File

@@ -90,6 +90,14 @@ public class RestApplicationTests {
.getBody()).isEqualTo("foobar");
}
@Test
public void timeoutJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/timeout"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("[\"foo\"]");
}
@Test
public void emptyJson() throws Exception {
assertThat(rest
@@ -213,6 +221,13 @@ public class RestApplicationTests {
return () -> Flux.fromIterable(Collections.emptyList());
}
@Bean
public Supplier<Flux<String>> timeout() {
return () -> Flux.create(emitter -> {
emitter.next("foo");
});
}
@Bean
public Supplier<Flux<List<String>>> sentences() {
return () -> Flux.just(Arrays.asList("go", "home"),