From 17b644f5633b86155d7b2838fe9246b99c913949 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Thu, 9 Mar 2017 16:10:08 +0000 Subject: [PATCH] If HTTP client asks for JSON, then time out the response An HTTP response does not have to be an infinite stream, and in fact life is simpler if it is not. The timeout in the web wrappers can be used to close the response and return normally to a client that has been waiting more than (say) 1s, instead of treating it as an error condition. Error handling is still kind of unsolved. --- .../function/web/FunctionController.java | 1 + .../web/flux/FluxResponseBodyEmitter.java | 10 +++-- .../web/flux/FluxResponseSseEmitter.java | 7 +++- .../web/flux/FluxReturnValueHandler.java | 21 +++++++++-- .../flux/ResponseBodyEmitterSubscriber.java | 37 +++++++++++++++---- .../function/web/RestApplicationTests.java | 15 ++++++++ 6 files changed, 74 insertions(+), 17 deletions(-) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index 49b7db408..0990a94b3 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -70,6 +70,7 @@ public class FunctionController { } @GetMapping(path = "/{name}") + @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux supplier(@PathVariable String name) { Supplier supplier = functions.lookupSupplier(name); if (!FunctionUtils.isFluxSupplier(supplier)) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java index e32cd29f9..febe56dcd 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java @@ -16,6 +16,8 @@ package org.springframework.cloud.function.web.flux; +import java.time.Duration; + import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; @@ -38,9 +40,10 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { public FluxResponseBodyEmitter(Long timeout, MediaType mediaType, Flux observable) { - super(timeout); + super(); this.mediaType = mediaType; - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + new ResponseBodyEmitterSubscriber<>(mediaType, + observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); } @Override @@ -48,7 +51,8 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { super.extendResponse(outputMessage); HttpHeaders headers = outputMessage.getHeaders(); - if (headers.getContentType() == null && this.mediaType!=null && !MediaType.ALL.equals(this.mediaType)) { + if (headers.getContentType() == null && this.mediaType != null + && !MediaType.ALL.equals(this.mediaType)) { headers.setContentType(this.mediaType); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java index 54b55911d..8c39782e9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java @@ -16,6 +16,8 @@ package org.springframework.cloud.function.web.flux; +import java.time.Duration; + import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -35,8 +37,9 @@ class FluxResponseSseEmitter extends SseEmitter { } public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux observable) { - super(timeout); - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + super(); + new ResponseBodyEmitterSubscriber<>(mediaType, + observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java index 59c4b2396..8925b3fed 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java @@ -40,12 +40,23 @@ import reactor.core.publisher.Flux; public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { private ResponseBodyEmitterReturnValueHandler delegate; + private long timeout = 1000L; private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); public FluxReturnValueHandler(List> messageConverters) { delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters); } + /** + * Timeout for clients. If no items are seen on an HTTP response in this period then + * the response is closed. + * + * @param timeout the timeout to set + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + @Override public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { return returnValue != null && supportsReturnType(returnType); @@ -79,12 +90,14 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand MediaType mediaType = webRequest.getHeader("Accept") == null ? null : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() .next(); - delegate.handleReturnValue(getEmitter(1000L, flux, mediaType), - returnType, mavContainer, webRequest); + delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType, + mavContainer, webRequest); } - private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, MediaType mediaType) { - if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) { + private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, + MediaType mediaType) { + if (!MediaType.ALL.equals(mediaType) + && EVENT_STREAM.isCompatibleWith(mediaType)) { return new FluxResponseSseEmitter<>(timeout, mediaType, flux); } return new FluxResponseBodyEmitter<>(timeout, mediaType, flux); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java index fcff83d42..f4884265a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.web.flux; import java.io.IOException; +import java.util.concurrent.TimeoutException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -32,7 +33,7 @@ import reactor.core.publisher.Flux; * * @author Dave Syer */ -class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { +class ResponseBodyEmitterSubscriber implements Subscriber { private final MediaType mediaType; @@ -49,8 +50,8 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; - this.responseBodyEmitter.onTimeout(this); - this.responseBodyEmitter.onCompletion(this); + this.responseBodyEmitter.onTimeout(new Timeout()); + this.responseBodyEmitter.onCompletion(new Complete()); observable.subscribe(this); } @@ -98,11 +99,19 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { try { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { - if (this.firstElementWritten) { + if (!this.firstElementWritten) { + responseBodyEmitter.send("[]"); + } + else { responseBodyEmitter.send("]"); } } - responseBodyEmitter.completeWithError(e); + if (e instanceof TimeoutException) { + responseBodyEmitter.complete(); + } + else { + responseBodyEmitter.completeWithError(e); + } } catch (IOException ex) { throw new RuntimeException(ex.getMessage(), ex); @@ -130,8 +139,20 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { } } - @Override - public void run() { - this.subscription.cancel(); + class Complete implements Runnable { + + @Override + public void run() { + ResponseBodyEmitterSubscriber.this.subscription.cancel(); + } + } + + class Timeout implements Runnable { + + @Override + public void run() { + onComplete(); + ResponseBodyEmitterSubscriber.this.subscription.cancel(); + } } } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index e50101e32..8ac53e55b 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -90,6 +90,14 @@ public class RestApplicationTests { .getBody()).isEqualTo("foobar"); } + @Test + public void timeoutJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + @Test public void emptyJson() throws Exception { assertThat(rest @@ -213,6 +221,13 @@ public class RestApplicationTests { return () -> Flux.fromIterable(Collections.emptyList()); } + @Bean + public Supplier> timeout() { + return () -> Flux.create(emitter -> { + emitter.next("foo"); + }); + } + @Bean public Supplier>> sentences() { return () -> Flux.just(Arrays.asList("go", "home"),