From 375090bb7c7dd77ae67c2e6d25024f6eafe32910 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 16 Apr 2019 20:51:40 -0400 Subject: [PATCH] LeakAwareDataBuffer related fixes Following on 3ebbfa2191376d9920c57b545fbd3c07167b4c1e where the local refCount was removed in favor of using the internal refCount of the native data buffer, this commit ensures that LeakAwareDataBufferFactory uses a PooledDataBufferFactory delegate by default. There are also fixes for test issues with eager allocation uncovered by these changes in StringDecoder and ResourceDecoder. --- .../core/codec/AbstractDecoderTestCase.java | 69 ++++++++----------- .../core/codec/ResourceDecoderTests.java | 31 ++------- .../codec/ResourceRegionEncoderTests.java | 5 +- .../core/codec/StringDecoderTests.java | 49 ++++--------- .../core/io/buffer/LeakAwareDataBuffer.java | 5 ++ .../io/buffer/LeakAwareDataBufferFactory.java | 8 ++- .../reactive/ChannelSendOperatorTests.java | 11 +-- .../result/view/ZeroDemandResponse.java | 5 +- 8 files changed, 62 insertions(+), 121 deletions(-) diff --git a/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java b/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java index 4feb5766a2..cbbb3b4756 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java +++ b/spring-core/src/test/java/org/springframework/core/codec/AbstractDecoderTestCase.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.core.codec; +import java.time.Duration; import java.util.Map; import java.util.function.Consumer; @@ -32,6 +33,8 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; +import static org.junit.Assert.*; + /** * Abstract base class for {@link Decoder} unit tests. Subclasses need to implement * {@link #canDecode()}, {@link #decode()} and {@link #decodeToMono()}, possibly using the wide @@ -99,6 +102,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecodeAll(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecodeAll(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -122,6 +126,7 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeAll(Publisher input, ResolvableType outputType, Consumer> stepConsumer, @Nullable MimeType mimeType, @Nullable Map hints) { + testDecode(input, outputType, stepConsumer, mimeType, hints); testDecodeError(input, outputType, mimeType, hints); testDecodeCancel(input, outputType, mimeType, hints); @@ -151,6 +156,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecode(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecode(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -202,16 +208,14 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeError(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectNextCount(1) - .expectError(InputException.class) - .verify(); + input = Mono.from(input).concatWith(Flux.error(new InputException())); + try { + this.decoder.decode(input, outputType, mimeType, hints).blockLast(Duration.ofSeconds(5)); + fail(); + } + catch (InputException ex) { + // expected + } } /** @@ -229,11 +233,7 @@ public abstract class AbstractDecoderTestCase> @Nullable MimeType mimeType, @Nullable Map hints) { Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectNextCount(1) - .thenCancel() - .verify(); + StepVerifier.create(result).expectNextCount(1).thenCancel().verify(); } /** @@ -249,9 +249,7 @@ public abstract class AbstractDecoderTestCase> Flux input = Flux.empty(); Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .verifyComplete(); + StepVerifier.create(result).verifyComplete(); } // Mono @@ -297,6 +295,7 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoAll(Publisher input, ResolvableType outputType, Consumer> stepConsumer, @Nullable MimeType mimeType, @Nullable Map hints) { + testDecodeToMono(input, outputType, stepConsumer, mimeType, hints); testDecodeToMonoError(input, outputType, mimeType, hints); testDecodeToMonoCancel(input, outputType, mimeType, hints); @@ -326,6 +325,7 @@ public abstract class AbstractDecoderTestCase> */ protected void testDecodeToMono(Publisher input, Class outputClass, Consumer> stepConsumer) { + testDecodeToMono(input, ResolvableType.forClass(outputClass), stepConsumer, null, null); } @@ -377,15 +377,9 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoError(Publisher input, ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - + input = Mono.from(input).concatWith(Flux.error(new InputException())); Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); + StepVerifier.create(result).expectError(InputException.class).verify(); } /** @@ -401,10 +395,7 @@ public abstract class AbstractDecoderTestCase> @Nullable MimeType mimeType, @Nullable Map hints) { Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .thenCancel() - .verify(); + StepVerifier.create(result).thenCancel().verify(); } /** @@ -418,11 +409,8 @@ public abstract class AbstractDecoderTestCase> protected void testDecodeToMonoEmpty(ResolvableType outputType, @Nullable MimeType mimeType, @Nullable Map hints) { - Flux input = Flux.empty(); - Mono result = this.decoder.decodeToMono(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .verifyComplete(); + Mono result = this.decoder.decodeToMono(Flux.empty(), outputType, mimeType, hints); + StepVerifier.create(result).verifyComplete(); } /** @@ -431,10 +419,10 @@ public abstract class AbstractDecoderTestCase> * @return the deferred buffer */ protected Mono dataBuffer(byte[] bytes) { - return Mono.defer(() -> { + return Mono.fromCallable(() -> { DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(bytes.length); dataBuffer.write(bytes); - return Mono.just(dataBuffer); + return dataBuffer; }); } @@ -442,9 +430,6 @@ public abstract class AbstractDecoderTestCase> * Exception used in {@link #testDecodeError} and {@link #testDecodeToMonoError} */ @SuppressWarnings("serial") - public static class InputException extends RuntimeException { - - } - + public static class InputException extends RuntimeException {} } diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java index d8bab5a1f5..71f87f69d6 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,25 +18,19 @@ package org.springframework.core.codec; import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import org.junit.Test; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; -import reactor.test.StepVerifier; -import org.springframework.core.ResolvableType; import org.springframework.core.io.ByteArrayResource; import org.springframework.core.io.InputStreamResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.lang.Nullable; -import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.StreamUtils; import static org.junit.Assert.*; -import static org.springframework.core.ResolvableType.forClass; +import static org.springframework.core.ResolvableType.*; /** * @author Arjen Poutsma @@ -66,9 +60,7 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase input = Flux.concat( - dataBuffer(this.fooBytes), - dataBuffer(this.barBytes)); + Flux input = Flux.concat(dataBuffer(this.fooBytes), dataBuffer(this.barBytes)); testDecodeAll(input, Resource.class, step -> step .consumeNextWith(resource -> { @@ -85,22 +77,7 @@ public class ResourceDecoderTests extends AbstractDecoderTestCase input, ResolvableType outputType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); - } - - @Override - public void decodeToMono() throws Exception { + public void decodeToMono() { Flux input = Flux.concat( dataBuffer(this.fooBytes), dataBuffer(this.barBytes)); diff --git a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java index 99f058fd05..5ca6795d7c 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/ResourceRegionEncoderTests.java @@ -19,7 +19,6 @@ package org.springframework.core.codec; import java.util.Collections; import java.util.function.Consumer; -import io.netty.buffer.PooledByteBufAllocator; import org.junit.After; import org.junit.Test; import org.reactivestreams.Subscription; @@ -34,7 +33,6 @@ import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.support.DataBufferTestUtils; import org.springframework.core.io.support.ResourceRegion; import org.springframework.util.MimeType; @@ -51,8 +49,7 @@ public class ResourceRegionEncoderTests { private ResourceRegionEncoder encoder = new ResourceRegionEncoder(); - private LeakAwareDataBufferFactory bufferFactory = - new LeakAwareDataBufferFactory(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)); + private LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); @After diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index 9110cc9dd1..ed10c96c83 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,18 +19,16 @@ package org.springframework.core.codec; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.Map; import org.junit.Test; -import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.lang.Nullable; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; @@ -61,10 +59,8 @@ public class StringDecoderTests extends AbstractDecoderTestCase { assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON)); assertTrue(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8"))); - assertFalse(this.decoder.canDecode( - ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)); - assertFalse(this.decoder.canDecode( - ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)); + assertFalse(this.decoder.canDecode(ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)); + assertFalse(this.decoder.canDecode(ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)); } @Override @@ -76,24 +72,7 @@ public class StringDecoderTests extends AbstractDecoderTestCase { String s = String.format("%s\n%s\n%s", u, e, o); Flux input = toDataBuffers(s, 1, UTF_8); - testDecodeAll(input, ResolvableType.forClass(String.class), step -> step - .expectNext(u, e, o) - .verifyComplete(), null, null); - } - - @Override - protected void testDecodeError(Publisher input, ResolvableType outputType, - @Nullable MimeType mimeType, @Nullable Map hints) { - - input = Flux.concat( - Flux.from(input).take(1), - Flux.error(new InputException())); - - Flux result = this.decoder.decode(input, outputType, mimeType, hints); - - StepVerifier.create(result) - .expectError(InputException.class) - .verify(); + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); } @Test @@ -105,21 +84,21 @@ public class StringDecoderTests extends AbstractDecoderTestCase { Flux source = toDataBuffers(s, 2, UTF_16BE); MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); - testDecode(source, TYPE, step -> step - .expectNext(u, e, o) - .verifyComplete(), mimeType, null); + testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null); } private Flux toDataBuffers(String s, int length, Charset charset) { byte[] bytes = s.getBytes(charset); - - List dataBuffers = new ArrayList<>(); + List chunks = new ArrayList<>(); for (int i = 0; i < bytes.length; i += length) { - DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); - dataBuffer.write(bytes, i, length); - dataBuffers.add(dataBuffer); + chunks.add(Arrays.copyOfRange(bytes, i, i + length)); } - return Flux.fromIterable(dataBuffers); + return Flux.fromIterable(chunks) + .map(chunk -> { + DataBuffer dataBuffer = this.bufferFactory.allocateBuffer(length); + dataBuffer.write(chunk, 0, chunk.length); + return dataBuffer; + }); } @Test diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java index a41a99aa69..155dc056b5 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java @@ -63,6 +63,11 @@ class LeakAwareDataBuffer implements PooledDataBuffer { return this.leakError; } + + public DataBuffer getDelegate() { + return this.delegate; + } + @Override public boolean isAllocated() { return this.delegate instanceof PooledDataBuffer && diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index 62900ad9f5..230d1ac51e 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import io.netty.buffer.PooledByteBufAllocator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.jetbrains.annotations.NotNull; @@ -55,7 +56,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { * {@link DefaultDataBufferFactory}. */ public LeakAwareDataBufferFactory() { - this(new DefaultDataBufferFactory()); + this(new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT)); } /** @@ -67,6 +68,7 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { this.delegate = delegate; } + /** * Checks whether all of the data buffers allocated by this factory have also been released. * If not, then an {@link AssertionError} is thrown. Typically used from a JUnit {@link After} @@ -126,6 +128,10 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { @Override public DataBuffer join(List dataBuffers) { + // Remove LeakAwareDataBuffer wrapper so delegate can find native buffers + dataBuffers = dataBuffers.stream() + .map(o -> o instanceof LeakAwareDataBuffer ? ((LeakAwareDataBuffer) o).getDelegate() : o) + .collect(Collectors.toList()); return new LeakAwareDataBuffer(this.delegate.join(dataBuffers), this); } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java index d1aa4fc9f9..3c905f6227 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ChannelSendOperatorTests.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import io.netty.buffer.ByteBufAllocator; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; @@ -38,7 +37,6 @@ import reactor.test.StepVerifier; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import static org.junit.Assert.*; @@ -135,8 +133,7 @@ public class ChannelSendOperatorTests { @Test // gh-22720 public void cancelWhileItemCached() { - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ChannelSendOperator operator = new ChannelSendOperator<>( Mono.fromCallable(() -> { @@ -164,8 +161,7 @@ public class ChannelSendOperatorTests { // 2. writeFunction applied and writeCompletionBarrier subscribed to it // 3. Write Publisher fails right after that and before request(n) from server - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ZeroDemandSubscriber writeSubscriber = new ZeroDemandSubscriber(); ChannelSendOperator operator = new ChannelSendOperator<>( @@ -200,8 +196,7 @@ public class ChannelSendOperatorTests { // 2. writeFunction applied and writeCompletionBarrier subscribed to it // 3. writeFunction fails, e.g. to flush status and headers, before request(n) from server - NettyDataBufferFactory delegate = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT); - LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(delegate); + LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); ChannelSendOperator operator = new ChannelSendOperator<>( Flux.create(sink -> { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java index 4e4eee2f6b..e75fcdf783 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/result/view/ZeroDemandResponse.java @@ -17,7 +17,6 @@ package org.springframework.web.reactive.result.view; import java.util.function.Supplier; -import io.netty.buffer.PooledByteBufAllocator; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; import reactor.core.publisher.BaseSubscriber; @@ -26,7 +25,6 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.LeakAwareDataBufferFactory; -import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; @@ -47,8 +45,7 @@ public class ZeroDemandResponse implements ServerHttpResponse { public ZeroDemandResponse() { - NettyDataBufferFactory delegate = new NettyDataBufferFactory(PooledByteBufAllocator.DEFAULT); - this.bufferFactory = new LeakAwareDataBufferFactory(delegate); + this.bufferFactory = new LeakAwareDataBufferFactory(); }