diff --git a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java index 79f06edfb8..71fc199378 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/server/reactive/MockServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,8 +25,10 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpHeaders; @@ -54,10 +56,17 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse() { - super(new DefaultDataBufferFactory()); + this(new DefaultDataBufferFactory()); + } + + public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { + super(dataBufferFactory); this.writeHandler = body -> { - this.body = body.cache(); - return this.body.then(); + // Avoid .then() which causes data buffers to be released + MonoProcessor completion = MonoProcessor.create(); + this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + this.body.subscribe(); + return completion; }; } @@ -125,8 +134,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { * charset or "UTF-8" by default. */ public Mono getBodyAsString() { + Charset charset = Optional.ofNullable(getHeaders().getContentType()).map(MimeType::getCharset) .orElse(StandardCharsets.UTF_8); + return getBody() .reduce(bufferFactory().allocateBuffer(), (previous, current) -> { previous.write(current); @@ -137,8 +148,10 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { } private static String bufferToString(DataBuffer buffer, Charset charset) { + Assert.notNull(charset, "'charset' must not be null"); byte[] bytes = new byte[buffer.readableByteCount()]; buffer.read(bytes); + DataBufferUtils.release(buffer); return new String(bytes, charset); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index 8eef885950..8cc4d42348 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,8 +81,16 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); - return this.outbound.send(byteBufFlux).then(); + // Send as Mono if possible as an optimization hint to Reactor Netty + if (body instanceof Mono) { + Mono byteBufMono = Mono.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufMono).then(); + + } + else { + Flux byteBufFlux = Flux.from(body).map(NettyDataBufferFactory::toByteBuf); + return this.outbound.send(byteBufFlux).then(); + } }); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java index ee19d418b4..785e163e4f 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractServerHttpResponse.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,8 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpLogging; import org.springframework.http.HttpStatus; @@ -173,9 +175,16 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse { } @Override + @SuppressWarnings("unchecked") public final Mono writeWith(Publisher body) { - return new ChannelSendOperator<>(body, - writePublisher -> doCommit(() -> writeWithInternal(writePublisher))) + // Write as Mono if possible as an optimization hint to Reactor Netty + // ChannelSendOperator not necessary for Mono + if (body instanceof Mono) { + return ((Mono) body).flatMap(buffer -> + doCommit(() -> writeWithInternal(Mono.just(buffer))) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release)); + } + return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeWithInternal(inner))) .doOnError(t -> removeContentLength()); } diff --git a/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java b/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java index 1c6a8a0003..7f23f769b3 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/FormHttpMessageWriterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -85,8 +85,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase { String expected = "name+1=value+1&name+2=value+2%2B1&name+2=value+2%2B2&name+3"; StepVerifier.create(response.getBody()) - .consumeNextWith(stringConsumer( - expected)) + .consumeNextWith(stringConsumer(expected)) .expectComplete() .verify(); HttpHeaders headers = response.getHeaders(); @@ -96,8 +95,7 @@ public class FormHttpMessageWriterTests extends AbstractLeakCheckingTestCase { private Consumer stringConsumer(String expected) { return dataBuffer -> { - String value = - DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8); + String value = DataBufferTestUtils.dumpString(dataBuffer, StandardCharsets.UTF_8); DataBufferUtils.release(dataBuffer); assertEquals(expected, value); }; diff --git a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java index 76049ee9e6..366dbbdc3c 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/server/reactive/test/MockServerHttpResponse.java @@ -25,6 +25,7 @@ import java.util.function.Function; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -61,8 +62,11 @@ public class MockServerHttpResponse extends AbstractServerHttpResponse { public MockServerHttpResponse(DataBufferFactory dataBufferFactory) { super(dataBufferFactory); this.writeHandler = body -> { - this.body = body.cache(); - return this.body.then(); + // Avoid .then() which causes data buffers to be released + MonoProcessor completion = MonoProcessor.create(); + this.body = body.doOnComplete(completion::onComplete).doOnError(completion::onError).cache(); + this.body.subscribe(); + return completion; }; }