LeakAwareDataBuffer related fixes

Following on 3ebbfa2191 where the local
refCount was removed in favor of using the internal refCount of the
native data buffer, this commit ensures that LeakAwareDataBufferFactory
uses a PooledDataBufferFactory delegate by default.

There are also fixes for test issues with eager allocation uncovered by
these changes in StringDecoder and ResourceDecoder.
This commit is contained in:
Rossen Stoyanchev
2019-04-16 20:51:40 -04:00
parent 0109231d8e
commit 375090bb7c
8 changed files with 62 additions and 121 deletions

View File

@@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufAllocator;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Publisher;
@@ -38,7 +37,6 @@ import reactor.test.StepVerifier;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.LeakAwareDataBufferFactory;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import static org.junit.Assert.*;
@@ -135,8 +133,7 @@ public class ChannelSendOperatorTests {
@Test // gh-22720
public void cancelWhileItemCached() {
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Mono.fromCallable(() -> {
@@ -164,8 +161,7 @@ public class ChannelSendOperatorTests {
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. Write Publisher fails right after that and before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
@@ -200,8 +196,7 @@ public class ChannelSendOperatorTests {
// 2. writeFunction applied and writeCompletionBarrier subscribed to it
// 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server
NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate);
LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory();
ChannelSendOperator<DataBuffer> operator = new ChannelSendOperator<>(
Flux.create(sink -> {