Improve writing in mock reactive request and response

Before this change the write Publisher was saved and Mono.empty()
returned from the write metohd which did not properly implement
the write contract since no writing ("consuming") was done.

This can be a problem in some cases. For example the request may appear
to succeed even if the publisher produces an error later when
subscribed to later after request handling completes.

This commit introduces a writeHandler function in the mock request and
response. By default it "writes" by consuming the content immediately,
which allows it to return a Mono<Void> that properly reflects when
writing is done, and it also caches the data so it may be replayed
later for test assertions.

For streaming scenario a custom writeHandler may be registered which
allows the custom handling to determine how long to stream before
cancelling so request handling may complete.

Issue: SPR-14590
This commit is contained in:
Rossen Stoyanchev
2017-02-07 21:57:38 -05:00
parent d41d28f8ce
commit f2967467e0
12 changed files with 225 additions and 74 deletions

View File

@@ -17,6 +17,7 @@
package org.springframework.mock.http.client.reactive.test;
import java.net.URI;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -28,6 +29,7 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.reactive.AbstractClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.util.Assert;
import org.springframework.web.util.UriComponentsBuilder;
/**
@@ -44,7 +46,11 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private Flux<DataBuffer> body;
private Flux<DataBuffer> body = Flux.error(
new IllegalStateException("The body is not set. " +
"Did handling complete with success? Is a custom \"writeHandler\" configured?"));
private Function<Flux<DataBuffer>, Mono<Void>> writeHandler = initDefaultWriteHandler();
public MockClientHttpRequest(HttpMethod httpMethod, String urlTemplate, Object... vars) {
@@ -56,6 +62,13 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
this.url = url;
}
private Function<Flux<DataBuffer>, Mono<Void>> initDefaultWriteHandler() {
return body -> {
this.body = body.cache();
return this.body.then();
};
}
@Override
public HttpMethod getMethod() {
@@ -72,22 +85,27 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
return this.bufferFactory;
}
/**
* Return the request body, or an error stream if the body was never set
* or when {@link #setWriteHandler} is configured.
*/
public Flux<DataBuffer> getBody() {
return this.body;
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
this.body = Flux.from(body);
return doCommit(() -> {
this.body = Flux.from(body);
return Mono.empty();
});
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
/**
* Configure a custom handler for writing the request body.
*
* <p>The default write handler consumes and caches the request body so it
* may be accessed subsequently, e.g. in test assertions. Use this property
* when the request body is an infinite stream.
*
* @param writeHandler the write handler to use returning {@code Mono<Void>}
* when the body has been "written" (i.e. consumed).
*/
public void setWriteHandler(Function<Flux<DataBuffer>, Mono<Void>> writeHandler) {
Assert.notNull(writeHandler, "'writeHandler' is required");
this.writeHandler = writeHandler;
}
@Override
@@ -98,9 +116,19 @@ public class MockClientHttpRequest extends AbstractClientHttpRequest {
protected void applyCookies() {
}
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> Mono.defer(() -> this.writeHandler.apply(Flux.from(body))));
}
@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
}
@Override
public Mono<Void> setComplete() {
return doCommit(Mono::empty);
return writeWith(Flux.empty());
}
}

View File

@@ -18,6 +18,7 @@ package org.springframework.mock.http.server.reactive.test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -38,24 +39,37 @@ import org.springframework.util.Assert;
*/
public class MockServerHttpResponse extends AbstractServerHttpResponse {
private Flux<DataBuffer> body;
private Flux<DataBuffer> body = Flux.error(
new IllegalStateException("The body is not set. " +
"Did handling complete with success? Is a custom \"writeHandler\" configured?"));
private Function<Flux<DataBuffer>, Mono<Void>> writeHandler = initDefaultWriteHandler();
public MockServerHttpResponse() {
super(new DefaultDataBufferFactory());
}
private Function<Flux<DataBuffer>, Mono<Void>> initDefaultWriteHandler() {
return body -> {
this.body = body.cache();
return this.body.then();
};
}
/**
* Return the output Publisher used to write to the response.
* Return the request body, or an error stream if the body was never set
* or when {@link #setWriteHandler} is configured.
*/
public Flux<DataBuffer> getBody() {
return this.body;
}
/**
* Return the response body aggregated and converted to a String using the
* charset of the Content-Type response or otherwise as "UTF-8".
* Shortcut method that delegates to {@link #getBody()} and then aggregates
* the data buffers and converts to a String using the charset of the
* Content-Type header or falling back on "UTF-8" by default.
*/
public Mono<String> getBodyAsString() {
Charset charset = getCharset();
@@ -84,15 +98,19 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
return (charset != null ? charset : StandardCharsets.UTF_8);
}
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
this.body = Flux.from(body);
return Mono.empty();
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWithInternal(Flux.from(body).flatMap(Flux::from));
/**
* Configure a custom handler for writing the request body.
*
* <p>The default write handler consumes and caches the request body so it
* may be accessed subsequently, e.g. in test assertions. Use this property
* when the request body is an infinite stream.
*
* @param writeHandler the write handler to use returning {@code Mono<Void>}
* when the body has been "written" (i.e. consumed).
*/
public void setWriteHandler(Function<Flux<DataBuffer>, Mono<Void>> writeHandler) {
Assert.notNull(writeHandler, "'writeHandler' is required");
this.writeHandler = writeHandler;
}
@Override
@@ -107,4 +125,19 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
protected void applyCookies() {
}
@Override
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> body) {
return this.writeHandler.apply(Flux.from(body));
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return this.writeHandler.apply(Flux.from(body).concatMap(Flux::from));
}
@Override
public Mono<Void> setComplete() {
return doCommit(() -> Mono.defer(() -> this.writeHandler.apply(Flux.empty())));
}
}