From 915ff401bec8f39b509e2167ab05cbcad240304b Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Mon, 13 Mar 2017 15:01:03 +0000 Subject: [PATCH] Make GET /{function}/{value} single valued (Mono) When the user has sent us a single value, we can make the signature of the handler and the format of the HTTP response much more natural if it is single valued too (i.e. a Mono). --- .../function/web/FunctionController.java | 6 ++-- .../web/flux/FluxResponseBodyEmitter.java | 12 +++---- .../web/flux/FluxResponseSseEmitter.java | 11 +++---- .../web/flux/FluxReturnValueHandler.java | 16 ++++++--- .../flux/ResponseBodyEmitterSubscriber.java | 33 ++++++++++++------- .../function/web/RestApplicationTests.java | 14 ++++++++ 6 files changed, 61 insertions(+), 31 deletions(-) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java index c702bbe81..4bc94fbdb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/FunctionController.java @@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * @author Dave Syer @@ -83,11 +84,12 @@ public class FunctionController { } @GetMapping(path = "/{name}/{value}") - public Flux value(@PathVariable String name, @PathVariable String value) { + public Mono value(@PathVariable String name, @PathVariable String value) { Function, Flux> function = functions.lookupFunction(name); if (function != null) { @SuppressWarnings({ "unchecked" }) - Flux result = (Flux) function.apply(Flux.just(value)); + Mono result = Mono + .from((Flux) function.apply(Flux.just(value))); return debug ? result.log() : result; } throw new IllegalArgumentException("no such function: " + name); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java index edca9879b..d175481cf 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseBodyEmitter.java @@ -16,7 +16,7 @@ package org.springframework.cloud.function.web.flux; -import java.time.Duration; +import org.reactivestreams.Publisher; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; @@ -34,16 +34,14 @@ class FluxResponseBodyEmitter extends ResponseBodyEmitter { private final MediaType mediaType; - public FluxResponseBodyEmitter(Flux observable) { - this(1000L, null, observable); + public FluxResponseBodyEmitter(Publisher observable) { + this(null, observable); } - public FluxResponseBodyEmitter(Long timeout, MediaType mediaType, - Flux observable) { + public FluxResponseBodyEmitter(MediaType mediaType, Publisher observable) { super(); this.mediaType = mediaType; - new ResponseBodyEmitterSubscriber<>(mediaType, - observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); } @Override diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java index 8c39782e9..9d63be7fb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxResponseSseEmitter.java @@ -16,7 +16,7 @@ package org.springframework.cloud.function.web.flux; -import java.time.Duration; +import org.reactivestreams.Publisher; import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; @@ -32,14 +32,13 @@ import reactor.core.publisher.Flux; */ class FluxResponseSseEmitter extends SseEmitter { - public FluxResponseSseEmitter(Flux observable) { - this(null, MediaType.valueOf("text/event-stream"), observable); + public FluxResponseSseEmitter(Publisher observable) { + this(MediaType.valueOf("text/event-stream"), observable); } - public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux observable) { + public FluxResponseSseEmitter(MediaType mediaType, Publisher observable) { super(); - new ResponseBodyEmitterSubscriber<>(mediaType, - observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this); + new ResponseBodyEmitterSubscriber<>(mediaType, observable, this); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java index 8925b3fed..8f5702bfb 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/FluxReturnValueHandler.java @@ -16,8 +16,11 @@ package org.springframework.cloud.function.web.flux; +import java.time.Duration; import java.util.List; +import org.reactivestreams.Publisher; + import org.springframework.core.MethodParameter; import org.springframework.core.ResolvableType; import org.springframework.http.MediaType; @@ -30,6 +33,7 @@ import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux} @@ -64,7 +68,7 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand @Override public boolean supportsReturnType(MethodParameter returnType) { - return Flux.class.isAssignableFrom(returnType.getParameterType()) + return Publisher.class.isAssignableFrom(returnType.getParameterType()) || isResponseEntity(returnType); } @@ -85,7 +89,7 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand if (returnValue instanceof ResponseEntity) { adaptFrom = ((ResponseEntity) returnValue).getBody(); } - Flux flux = (Flux) adaptFrom; + Publisher flux = (Publisher) adaptFrom; MediaType mediaType = webRequest.getHeader("Accept") == null ? null : MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator() @@ -94,13 +98,15 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand mavContainer, webRequest); } - private ResponseBodyEmitter getEmitter(Long timeout, Flux flux, + private ResponseBodyEmitter getEmitter(Long timeout, Publisher flux, MediaType mediaType) { + Publisher exported = flux instanceof Mono ? Mono.from(flux) + : Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty()); if (!MediaType.ALL.equals(mediaType) && EVENT_STREAM.isCompatibleWith(mediaType)) { - return new FluxResponseSseEmitter<>(timeout, mediaType, flux); + return new FluxResponseSseEmitter<>(mediaType, exported); } - return new FluxResponseBodyEmitter<>(timeout, mediaType, flux); + return new FluxResponseBodyEmitter<>(mediaType, exported); } } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java index f4884265a..0436ab12d 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/flux/ResponseBodyEmitterSubscriber.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.web.flux; import java.io.IOException; import java.util.concurrent.TimeoutException; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -26,6 +27,7 @@ import org.springframework.http.MediaType; import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; /** * Subscriber that emits any value produced by the {@link Flux} into the delegated @@ -45,13 +47,16 @@ class ResponseBodyEmitterSubscriber implements Subscriber { private boolean firstElementWritten; - public ResponseBodyEmitterSubscriber(MediaType mediaType, Flux observable, + private boolean single; + + public ResponseBodyEmitterSubscriber(MediaType mediaType, Publisher observable, ResponseBodyEmitter responseBodyEmitter) { this.mediaType = mediaType; this.responseBodyEmitter = responseBodyEmitter; this.responseBodyEmitter.onTimeout(new Timeout()); this.responseBodyEmitter.onCompletion(new Complete()); + this.single = observable instanceof Mono; observable.subscribe(this); } @@ -70,8 +75,10 @@ class ResponseBodyEmitterSubscriber implements Subscriber { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { if (!this.firstElementWritten) { - responseBodyEmitter.send("["); - this.firstElementWritten = true; + if (!single) { + responseBodyEmitter.send("["); + this.firstElementWritten = true; + } } else { responseBodyEmitter.send(","); @@ -99,11 +106,13 @@ class ResponseBodyEmitterSubscriber implements Subscriber { try { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { - if (!this.firstElementWritten) { - responseBodyEmitter.send("[]"); - } - else { - responseBodyEmitter.send("]"); + if (!single) { + if (!this.firstElementWritten) { + responseBodyEmitter.send("[]"); + } + else { + responseBodyEmitter.send("]"); + } } } if (e instanceof TimeoutException) { @@ -126,10 +135,12 @@ class ResponseBodyEmitterSubscriber implements Subscriber { try { if (!MediaType.ALL.equals(mediaType) && MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { - if (!this.firstElementWritten) { - responseBodyEmitter.send("["); + if (!single) { + if (!this.firstElementWritten) { + responseBodyEmitter.send("["); + } + responseBodyEmitter.send("]"); } - responseBodyEmitter.send("]"); } } catch (IOException e) { diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java index 082558073..f67bb2836 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/RestApplicationTests.java @@ -160,6 +160,14 @@ public class RestApplicationTests { assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123.."); } + @Test + public void convertGetJson() throws Exception { + assertThat(rest + .exchange(RequestEntity.get(new URI("/entity/321")) + .accept(MediaType.APPLICATION_JSON).build(), String.class) + .getBody()).isEqualTo("{\"value\":321}"); + } + @Test public void uppercaseJsonArray() throws Exception { assertThat(rest.exchange( @@ -208,6 +216,12 @@ public class RestApplicationTests { return flux -> flux.log().map(value -> ".." + value + ".."); } + @Bean + public Function, Flux>> entity() { + return flux -> flux.log() + .map(value -> Collections.singletonMap("value", value)); + } + @Bean public Function>, Flux>> maps() { return flux -> flux.map(value -> {