From bd956ed75adb2121be0471588bad1cd2d2748948 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 8 Apr 2019 08:24:37 -0400 Subject: [PATCH] DataBuffer fixes in Protobuf codecs Closes gh-22731 --- .../http/codec/protobuf/ProtobufDecoder.java | 24 +++++-- .../http/codec/protobuf/ProtobufEncoder.java | 47 ++++++------- ...ava => CancelWithoutDemandCodecTests.java} | 66 +++++++++++++++---- .../codec/protobuf/ProtobufDecoderTests.java | 12 ++-- 4 files changed, 104 insertions(+), 45 deletions(-) rename spring-web/src/test/java/org/springframework/http/codec/{CodecDataBufferLeakTests.java => CancelWithoutDemandCodecTests.java} (76%) diff --git a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java index ab0348c04e..c1f0e17bf7 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/protobuf/ProtobufDecoder.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -77,6 +77,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder, Method> methodCache = new ConcurrentReferenceHashMap<>(); + private final ExtensionRegistry extensionRegistry; private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE; @@ -114,8 +115,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder decode(Publisher inputStream, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { + MessageDecoderFunction decoderFunction = + new MessageDecoderFunction(elementType, this.maxMessageSize); + return Flux.from(inputStream) - .flatMapIterable(new MessageDecoderFunction(elementType, this.maxMessageSize)); + .flatMapIterable(decoderFunction) + .doOnTerminate(decoderFunction::discard); } @Override @@ -212,12 +217,13 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder 0); return messages; @@ -286,6 +292,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder encode(Publisher inputStream, DataBufferFactory bufferFactory, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - return Flux - .from(inputStream) - .map(message -> encodeMessage(message, bufferFactory, !(inputStream instanceof Mono))); - } - - private DataBuffer encodeMessage(Message message, DataBufferFactory bufferFactory, boolean streaming) { - DataBuffer buffer = bufferFactory.allocateBuffer(); - OutputStream outputStream = buffer.asOutputStream(); - try { - if (streaming) { - message.writeDelimitedTo(outputStream); - } - else { - message.writeTo(outputStream); - } - return buffer; - } - catch (IOException ex) { - throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex); - } + return Flux.from(inputStream) + .map(message -> { + DataBuffer buffer = bufferFactory.allocateBuffer(); + boolean release = true; + try { + if (!(inputStream instanceof Mono)) { + message.writeDelimitedTo(buffer.asOutputStream()); + } + else { + message.writeTo(buffer.asOutputStream()); + } + release = false; + return buffer; + } + catch (IOException ex) { + throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex); + } + finally { + if (release) { + DataBufferUtils.release(buffer); + } + } + }); } @Override diff --git a/spring-web/src/test/java/org/springframework/http/codec/CodecDataBufferLeakTests.java b/spring-web/src/test/java/org/springframework/http/codec/CancelWithoutDemandCodecTests.java similarity index 76% rename from spring-web/src/test/java/org/springframework/http/codec/CodecDataBufferLeakTests.java rename to spring-web/src/test/java/org/springframework/http/codec/CancelWithoutDemandCodecTests.java index 484020cb40..509e135542 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/CodecDataBufferLeakTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/CancelWithoutDemandCodecTests.java @@ -20,6 +20,8 @@ import java.util.Collections; import java.util.List; import java.util.function.Supplier; +import com.google.protobuf.Message; +import org.junit.After; import org.junit.Test; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -38,18 +40,28 @@ import org.springframework.http.ReactiveHttpOutputMessage; import org.springframework.http.client.MultipartBodyBuilder; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.http.codec.multipart.MultipartHttpMessageWriter; +import org.springframework.http.codec.protobuf.ProtobufDecoder; +import org.springframework.http.codec.protobuf.ProtobufEncoder; import org.springframework.http.codec.xml.Jaxb2XmlEncoder; +import org.springframework.protobuf.Msg; +import org.springframework.protobuf.SecondMsg; +import org.springframework.util.MimeType; /** * Test scenarios for data buffer leaks. * @author Rossen Stoyanchev - * @since 5.2 */ -public class CodecDataBufferLeakTests { +public class CancelWithoutDemandCodecTests { private final LeakAwareDataBufferFactory bufferFactory = new LeakAwareDataBufferFactory(); + @After + public void tearDown() throws Exception { + this.bufferFactory.checkForLeaks(); + } + + @Test // gh-22107 public void cancelWithEncoderHttpMessageWriterAndSingleValue() { CharSequenceEncoder encoder = CharSequenceEncoder.allMimeTypes(); @@ -58,8 +70,6 @@ public class CodecDataBufferLeakTests { writer.write(Mono.just("foo"), ResolvableType.forType(String.class), MediaType.TEXT_PLAIN, outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5)); - - this.bufferFactory.checkForLeaks(); } @Test // gh-22107 @@ -73,8 +83,6 @@ public class CodecDataBufferLeakTests { BaseSubscriber subscriber = new ZeroDemandSubscriber(); flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just).. subscriber.cancel(); - - this.bufferFactory.checkForLeaks(); } @Test // gh-22107 @@ -88,8 +96,39 @@ public class CodecDataBufferLeakTests { BaseSubscriber subscriber = new ZeroDemandSubscriber(); flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just).. subscriber.cancel(); + } - this.bufferFactory.checkForLeaks(); + @Test // gh-22543 + public void cancelWithProtobufEncoder() { + ProtobufEncoder encoder = new ProtobufEncoder(); + Msg msg = Msg.newBuilder().setFoo("Foo").setBlah(SecondMsg.newBuilder().setBlah(123).build()).build(); + + Flux flux = encoder.encode(Mono.just(msg), + this.bufferFactory, ResolvableType.forClass(Msg.class), + new MimeType("application", "x-protobuf"), Collections.emptyMap()); + + BaseSubscriber subscriber = new ZeroDemandSubscriber(); + flux.subscribe(subscriber); // Assume sync execution (e.g. encoding with Flux.just).. + subscriber.cancel(); + } + + @Test // gh-22731 + public void cancelWithProtobufDecoder() throws InterruptedException { + ProtobufDecoder decoder = new ProtobufDecoder(); + + Mono input = Mono.fromCallable(() -> { + Msg msg = Msg.newBuilder().setFoo("Foo").build(); + byte[] bytes = msg.toByteArray(); + DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); + buffer.write(bytes); + return buffer; + }); + + Flux messages = decoder.decode(input, ResolvableType.forType(Msg.class), + new MimeType("application", "x-protobuf"), Collections.emptyMap()); + ZeroDemandMessageSubscriber subscriber = new ZeroDemandMessageSubscriber(); + messages.subscribe(subscriber); + subscriber.cancel(); } @Test // gh-22107 @@ -104,8 +143,6 @@ public class CodecDataBufferLeakTests { writer.write(Mono.just(builder.build()), null, MediaType.MULTIPART_FORM_DATA, outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5)); - - this.bufferFactory.checkForLeaks(); } @Test // gh-22107 @@ -116,8 +153,6 @@ public class CodecDataBufferLeakTests { writer.write(Mono.just(event), ResolvableType.forClass(ServerSentEvent.class), MediaType.TEXT_EVENT_STREAM, outputMessage, Collections.emptyMap()).block(Duration.ofSeconds(5)); - - this.bufferFactory.checkForLeaks(); } @@ -183,4 +218,13 @@ public class CodecDataBufferLeakTests { // Just subscribe without requesting } } + + + private static class ZeroDemandMessageSubscriber extends BaseSubscriber { + + @Override + protected void hookOnSubscribe(Subscription subscription) { + // Just subscribe without requesting + } + } } diff --git a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java index 68a737b07e..07e944fc38 100644 --- a/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java +++ b/spring-web/src/test/java/org/springframework/http/codec/protobuf/ProtobufDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2019 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,10 +35,10 @@ import org.springframework.protobuf.Msg; import org.springframework.protobuf.SecondMsg; import org.springframework.util.MimeType; -import static java.util.Collections.emptyMap; +import static java.util.Collections.*; import static org.junit.Assert.*; -import static org.springframework.core.ResolvableType.forClass; -import static org.springframework.core.io.buffer.DataBufferUtils.release; +import static org.springframework.core.ResolvableType.*; +import static org.springframework.core.io.buffer.DataBufferUtils.*; /** * Unit tests for {@link ProtobufDecoder}. @@ -223,11 +223,11 @@ public class ProtobufDecoderTests extends AbstractDecoderTestCase dataBuffer(Msg msg) { - return Mono.defer(() -> { + return Mono.fromCallable(() -> { byte[] bytes = msg.toByteArray(); DataBuffer buffer = this.bufferFactory.allocateBuffer(bytes.length); buffer.write(bytes); - return Mono.just(buffer); + return buffer; }); }