Pass Mono to Reactor Netty when feasible

Closes gh-22800
This commit is contained in:
Rossen Stoyanchev
2019-04-16 16:54:33 -04:00
parent 15b2fb1210
commit 5b711a964b
5 changed files with 49 additions and 17 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -85,8 +85,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase {
String expected = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3";
StepVerifier.create(response.getBody())
.consumeNextWith(stringConsumer(
expected))
.consumeNextWith(stringConsumer(expected))
.expectComplete()
.verify();
HttpHeaders headers = response.getHeaders();
@@ -96,8 +95,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase {
private Consumer<DataBuffer> stringConsumer(String expected) {
return dataBuffer -> {
String value =
DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
String value = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8);
DataBufferUtils.release(dataBuffer);
assertEquals(expected, value);
};

View File

@@ -25,6 +25,7 @@ import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -61,8 +62,11 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse {
public MockServerHttpResponse(DataBufferFactory dataBufferFactory) {
super(dataBufferFactory);
this.writeHandler = body -> {
this.body = body.cache();
return this.body.then();
// Avoid .then() which causes data buffers to be released
MonoProcessor<Void> completion = MonoProcessor.create();
this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache();
this.body.subscribe();
return completion;
};
}