diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index 49b7db408..0990a94b3 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -70,6 +70,7 @@ public class FunctionController { } @GetMapping(path = "/{name}") + @SuppressWarnings({ "unchecked", "rawtypes" }) public Flux supplier(@PathVariable String name) { Supplier supplier = functions.lookupSupplier(name); if (!FunctionUtils.isFluxSupplier(supplier)) { diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java index e32cd29f9..febe56dcd 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java @@ -16,6 +16,8 @@ package org.springframework.cloud.function.web.flux; +import java.time.Duration; + import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; @@ -38,9 +40,10 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { public FluxResponseBodyEmitter(Long timeout, MediaType mediaType, Flux observable) { - super(timeout); + super(); this.mediaType = mediaType; - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + new ResponseBodyEmitterSubscriber<>(mediaType, + observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); } @Override @@ -48,7 +51,8 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { super.extendResponse(outputMessage); HttpHeaders headers = outputMessage.getHeaders(); - if (headers.getContentType() == null && this.mediaType!=null && !MediaType.ALL.equals(this.mediaType)) { + if (headers.getContentType() == null && this.mediaType != null + && !MediaType.ALL.equals(this.mediaType)) { headers.setContentType(this.mediaType); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java index 54b55911d..8c39782e9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java @@ -16,6 +16,8 @@ package org.springframework.cloud.function.web.flux; +import java.time.Duration; + import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -35,8 +37,9 @@ class FluxResponseSseEmitter extends SseEmitter { } public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux observable) { - super(timeout); - new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); + super(); + new ResponseBodyEmitterSubscriber<>(mediaType, + observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java index 59c4b2396..8925b3fed 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java @@ -40,12 +40,23 @@ import reactor.core.publisher.Flux; public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { private ResponseBodyEmitterReturnValueHandler delegate; + private long timeout = 1000L; private static final MediaType EVENT_STREAM = MediaType.valueOf("text/event-stream"); public FluxReturnValueHandler(List> messageConverters) { delegate = new ResponseBodyEmitterReturnValueHandler(messageConverters); } + /** + * Timeout for clients. If no items are seen on an HTTP response in this period then + * the response is closed. + * + * @param timeout the timeout to set + */ + public void setTimeout(long timeout) { + this.timeout = timeout; + } + @Override public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { return returnValue != null && supportsReturnType(returnType); @@ -79,12 +90,14 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand MediaType mediaType = webRequest.getHeader("Accept") == null ? null : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() .next(); - delegate.handleReturnValue(getEmitter(1000L, flux, mediaType), - returnType, mavContainer, webRequest); + delegate.handleReturnValue(getEmitter(timeout, flux, mediaType), returnType, + mavContainer, webRequest); } - private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, MediaType mediaType) { - if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) { + private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, + MediaType mediaType) { + if (!MediaType.ALL.equals(mediaType) + && EVENT_STREAM.isCompatibleWith(mediaType)) { return new FluxResponseSseEmitter<>(timeout, mediaType, flux); } return new FluxResponseBodyEmitter<>(timeout, mediaType, flux); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java index fcff83d42..f4884265a 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -17,6 +17,7 @@ package org.springframework.cloud.function.web.flux; import java.io.IOException; +import java.util.concurrent.TimeoutException; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -32,7 +33,7 @@ import reactor.core.publisher.Flux; * * @author Dave Syer */ -class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { +class ResponseBodyEmitterSubscriber implements Subscriber { private final MediaType mediaType; @@ -49,8 +50,8 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; - this.responseBodyEmitter.onTimeout(this); - this.responseBodyEmitter.onCompletion(this); + this.responseBodyEmitter.onTimeout(new Timeout()); + this.responseBodyEmitter.onCompletion(new Complete()); observable.subscribe(this); } @@ -98,11 +99,19 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { try { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { - if (this.firstElementWritten) { + if (!this.firstElementWritten) { + responseBodyEmitter.send("[]"); + } + else { responseBodyEmitter.send("]"); } } - responseBodyEmitter.completeWithError(e); + if (e instanceof TimeoutException) { + responseBodyEmitter.complete(); + } + else { + responseBodyEmitter.completeWithError(e); + } } catch (IOException ex) { throw new RuntimeException(ex.getMessage(), ex); @@ -130,8 +139,20 @@ class ResponseBodyEmitterSubscriber implements Subscriber, Runnable { } } - @Override - public void run() { - this.subscription.cancel(); + class Complete implements Runnable { + + @Override + public void run() { + ResponseBodyEmitterSubscriber.this.subscription.cancel(); + } + } + + class Timeout implements Runnable { + + @Override + public void run() { + onComplete(); + ResponseBodyEmitterSubscriber.this.subscription.cancel(); + } } } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index e50101e32..8ac53e55b 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -90,6 +90,14 @@ public class RestApplicationTests { .getBody()).isEqualTo("foobar"); } + @Test + public void timeoutJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/timeout")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("[\"foo\"]"); + } + @Test public void emptyJson() throws Exception { assertThat(rest @@ -213,6 +221,13 @@ public class RestApplicationTests { return () -> Flux.fromIterable(Collections.emptyList()); } + @Bean + public Supplier> timeout() { + return () -> Flux.create(emitter -> { + emitter.next("foo"); + }); + } + @Bean public Supplier>> sentences() { return () -> Flux.just(Arrays.asList("go", "home"),