Make GET /{function}/{value} single valued (Mono)

When the user has sent us a single value, we can make the signature
of the handler and the format of the HTTP response much more
natural if it is single valued too (i.e. a Mono).
This commit is contained in:
Dave Syer
2017-03-13 15:01:03 +00:00
parent 9b24d23df8
commit 915ff401be
6 changed files with 61 additions and 31 deletions

View File

@@ -34,6 +34,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* @author Dave Syer
@@ -83,11 +84,12 @@ public class FunctionController {
}
@GetMapping(path = "/{name}/{value}")
public Flux<String> value(@PathVariable String name, @PathVariable String value) {
public Mono<String> value(@PathVariable String name, @PathVariable String value) {
Function<Flux<?>, Flux<?>> function = functions.lookupFunction(name);
if (function != null) {
@SuppressWarnings({ "unchecked" })
Flux<String> result = (Flux<String>) function.apply(Flux.just(value));
Mono<String> result = Mono
.from((Flux<String>) function.apply(Flux.just(value)));
return debug ? result.log() : result;
}
throw new IllegalArgumentException("no such function: " + name);

View File

@@ -16,7 +16,7 @@
package org.springframework.cloud.function.web.flux;
import java.time.Duration;
import org.reactivestreams.Publisher;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
@@ -34,16 +34,14 @@ class FluxResponseBodyEmitter<T> extends ResponseBodyEmitter {
private final MediaType mediaType;
public FluxResponseBodyEmitter(Flux<T> observable) {
this(1000L, null, observable);
public FluxResponseBodyEmitter(Publisher<T> observable) {
this(null, observable);
}
public FluxResponseBodyEmitter(Long timeout, MediaType mediaType,
Flux<T> observable) {
public FluxResponseBodyEmitter(MediaType mediaType, Publisher<T> observable) {
super();
this.mediaType = mediaType;
new ResponseBodyEmitterSubscriber<>(mediaType,
observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
}
@Override

View File

@@ -16,7 +16,7 @@
package org.springframework.cloud.function.web.flux;
import java.time.Duration;
import org.reactivestreams.Publisher;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
@@ -32,14 +32,13 @@ import reactor.core.publisher.Flux;
*/
class FluxResponseSseEmitter<T> extends SseEmitter {
public FluxResponseSseEmitter(Flux<T> observable) {
this(null, MediaType.valueOf("text/event-stream"), observable);
public FluxResponseSseEmitter(Publisher<T> observable) {
this(MediaType.valueOf("text/event-stream"), observable);
}
public FluxResponseSseEmitter(Long timeout, MediaType mediaType, Flux<T> observable) {
public FluxResponseSseEmitter(MediaType mediaType, Publisher<T> observable) {
super();
new ResponseBodyEmitterSubscriber<>(mediaType,
observable.timeout(Duration.ofMillis(timeout), Flux.empty()), this);
new ResponseBodyEmitterSubscriber<>(mediaType, observable, this);
}
}

View File

@@ -16,8 +16,11 @@
package org.springframework.cloud.function.web.flux;
import java.time.Duration;
import java.util.List;
import org.reactivestreams.Publisher;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.http.MediaType;
@@ -30,6 +33,7 @@ import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitterReturnValueHandler;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* A specialized {@link AsyncHandlerMethodReturnValueHandler} that handles {@link Flux}
@@ -64,7 +68,7 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Flux.class.isAssignableFrom(returnType.getParameterType())
return Publisher.class.isAssignableFrom(returnType.getParameterType())
|| isResponseEntity(returnType);
}
@@ -85,7 +89,7 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
if (returnValue instanceof ResponseEntity) {
adaptFrom = ((ResponseEntity<?>) returnValue).getBody();
}
Flux<?> flux = (Flux<?>) adaptFrom;
Publisher<?> flux = (Publisher<?>) adaptFrom;
MediaType mediaType = webRequest.getHeader("Accept") == null ? null
: MediaType.parseMediaTypes(webRequest.getHeader("Accept")).iterator()
@@ -94,13 +98,15 @@ public class FluxReturnValueHandler implements AsyncHandlerMethodReturnValueHand
mavContainer, webRequest);
}
private ResponseBodyEmitter getEmitter(Long timeout, Flux<?> flux,
private ResponseBodyEmitter getEmitter(Long timeout, Publisher<?> flux,
MediaType mediaType) {
Publisher<?> exported = flux instanceof Mono ? Mono.from(flux)
: Flux.from(flux).timeout(Duration.ofMillis(timeout), Flux.empty());
if (!MediaType.ALL.equals(mediaType)
&& EVENT_STREAM.isCompatibleWith(mediaType)) {
return new FluxResponseSseEmitter<>(timeout, mediaType, flux);
return new FluxResponseSseEmitter<>(mediaType, exported);
}
return new FluxResponseBodyEmitter<>(timeout, mediaType, flux);
return new FluxResponseBodyEmitter<>(mediaType, exported);
}
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.function.web.flux;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
@@ -26,6 +27,7 @@ import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Subscriber that emits any value produced by the {@link Flux} into the delegated
@@ -45,13 +47,16 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
private boolean firstElementWritten;
public ResponseBodyEmitterSubscriber(MediaType mediaType, Flux<T> observable,
private boolean single;
public ResponseBodyEmitterSubscriber(MediaType mediaType, Publisher<T> observable,
ResponseBodyEmitter responseBodyEmitter) {
this.mediaType = mediaType;
this.responseBodyEmitter = responseBodyEmitter;
this.responseBodyEmitter.onTimeout(new Timeout());
this.responseBodyEmitter.onCompletion(new Complete());
this.single = observable instanceof Mono;
observable.subscribe(this);
}
@@ -70,8 +75,10 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
this.firstElementWritten = true;
if (!single) {
responseBodyEmitter.send("[");
this.firstElementWritten = true;
}
}
else {
responseBodyEmitter.send(",");
@@ -99,11 +106,13 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[]");
}
else {
responseBodyEmitter.send("]");
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[]");
}
else {
responseBodyEmitter.send("]");
}
}
}
if (e instanceof TimeoutException) {
@@ -126,10 +135,12 @@ class ResponseBodyEmitterSubscriber<T> implements Subscriber<T> {
try {
if (!MediaType.ALL.equals(mediaType)
&& MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
if (!single) {
if (!this.firstElementWritten) {
responseBodyEmitter.send("[");
}
responseBodyEmitter.send("]");
}
responseBodyEmitter.send("]");
}
}
catch (IOException e) {

View File

@@ -160,6 +160,14 @@ public class RestApplicationTests {
assertThat(rest.getForObject("/wrap/123", String.class)).isEqualTo("..123..");
}
@Test
public void convertGetJson() throws Exception {
assertThat(rest
.exchange(RequestEntity.get(new URI("/entity/321"))
.accept(MediaType.APPLICATION_JSON).build(), String.class)
.getBody()).isEqualTo("{\"value\":321}");
}
@Test
public void uppercaseJsonArray() throws Exception {
assertThat(rest.exchange(
@@ -208,6 +216,12 @@ public class RestApplicationTests {
return flux -> flux.log().map(value -> ".." + value + "..");
}
@Bean
public Function<Flux<Integer>, Flux<Map<String, Object>>> entity() {
return flux -> flux.log()
.map(value -> Collections.singletonMap("value", value));
}
@Bean
public Function<Flux<HashMap<String, String>>, Flux<Map<String, String>>> maps() {
return flux -> flux.map(value -> {