diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 5394ba9199..47ba75468a 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -101,8 +101,8 @@ public abstract class DataBufferUtils { bufferSize); return Flux.generate(generator); }, - DataBufferUtils::closeChannel - ); + DataBufferUtils::closeChannel) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } /** @@ -140,14 +140,16 @@ public abstract class DataBufferUtils { DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(bufferSize); ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); - return Flux.using(channelSupplier, + Flux result = Flux.using(channelSupplier, channel -> Flux.create(sink -> { - CompletionHandler completionHandler = - new AsynchronousFileChannelReadCompletionHandler(channel, - sink, position, dataBufferFactory, bufferSize); - channel.read(byteBuffer, position, dataBuffer, completionHandler); - }), + CompletionHandler completionHandler = + new AsynchronousFileChannelReadCompletionHandler(channel, + sink, position, dataBufferFactory, bufferSize); + channel.read(byteBuffer, position, dataBuffer, completionHandler); + }), DataBufferUtils::closeChannel); + + return result.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } /** @@ -391,12 +393,19 @@ public abstract class DataBufferUtils { } /** - * Release the given data buffer, if it is a {@link PooledDataBuffer}. + * Release the given data buffer, if it is a {@link PooledDataBuffer} and + * has been {@linkplain PooledDataBuffer#isAllocated() allocated}. * @param dataBuffer the data buffer to release * @return {@code true} if the buffer was released; {@code false} otherwise. */ public static boolean release(@Nullable DataBuffer dataBuffer) { - return (dataBuffer instanceof PooledDataBuffer && ((PooledDataBuffer) dataBuffer).release()); + if (dataBuffer instanceof PooledDataBuffer) { + PooledDataBuffer pooledDataBuffer = (PooledDataBuffer) dataBuffer; + if (pooledDataBuffer.isAllocated()) { + return pooledDataBuffer.release(); + } + } + return false; } /** diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java index 9cbe8c7826..b603b0d347 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/NettyDataBuffer.java @@ -259,6 +259,11 @@ public class NettyDataBuffer implements PooledDataBuffer { return new ByteBufOutputStream(this.byteBuf); } + @Override + public boolean isAllocated() { + return this.byteBuf.refCnt() > 0; + } + @Override public PooledDataBuffer retain() { return new NettyDataBuffer(this.byteBuf.retain(), this.dataBufferFactory); diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java index f12c6d0299..4c83ec37f6 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/PooledDataBuffer.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,12 @@ package org.springframework.core.io.buffer; */ public interface PooledDataBuffer extends DataBuffer { + /** + * Return {@code true} if this buffer is allocated; {@code false} if it has been deallocated. + * @since 5.1 + */ + boolean isAllocated(); + /** * Increase the reference count for this buffer by one. * @return this buffer @@ -32,9 +38,9 @@ public interface PooledDataBuffer extends DataBuffer { PooledDataBuffer retain(); /** - * Decrease the reference count for this buffer by one, and release it + * Decrease the reference count for this buffer by one, and deallocate it * once the count reaches zero. - * @return {@code true} if the buffer was released; {@code false} otherwise. + * @return {@code true} if the buffer was deallocated; {@code false} otherwise. */ boolean release(); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index a0c343da62..fb3a88e3bb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -29,7 +29,9 @@ import reactor.netty.Connection; import reactor.netty.http.server.HttpServerRequest; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; +import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.lang.Nullable; @@ -153,6 +155,7 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest { return this.request.remoteAddress(); } + @Override @Nullable protected SslInfo initSslInfo() { SslHandler sslHandler = ((Connection) this.request).channel().pipeline().get(SslHandler.class); @@ -165,7 +168,8 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { - return this.request.receive().retain().map(this.bufferFactory::wrap); + Flux body = this.request.receive().retain().map(this.bufferFactory::wrap); + return body.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } @SuppressWarnings("unchecked") diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 83c2ba483d..da6c061391 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -119,7 +119,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { - return Flux.from(this.body); + return Flux.from(this.body) + .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } @SuppressWarnings("unchecked") @@ -216,6 +217,11 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { this.pooledByteBuffer = pooledByteBuffer; } + @Override + public boolean isAllocated() { + return this.pooledByteBuffer.isOpen(); + } + @Override public PooledDataBuffer retain() { return this;