DataBuffer fixes in Protobuf codecs

Closes gh-22731
This commit is contained in:
Rossen Stoyanchev
2019-04-08 08:24:37 -04:00
parent 2835424f9d
commit bd956ed75a
4 changed files with 104 additions and 45 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
private static final ConcurrentMap<Class<?>, Method> methodCache = new ConcurrentReferenceHashMap<>();
private final ExtensionRegistry extensionRegistry;
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
@@ -114,8 +115,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
MessageDecoderFunction decoderFunction =
new MessageDecoderFunction(elementType, this.maxMessageSize);
return Flux.from(inputStream)
.flatMapIterable(new MessageDecoderFunction(elementType, this.maxMessageSize));
.flatMapIterable(decoderFunction)
.doOnTerminate(decoderFunction::discard);
}
@Override
@@ -212,12 +217,13 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this.messageBytesToRead -= chunkBytesToRead;
if (this.messageBytesToRead == 0) {
Message.Builder builder = getMessageBuilder(this.elementType.toClass());
ByteBuffer buffer = this.output.asByteBuffer();
builder.mergeFrom(CodedInputStream.newInstance(buffer), extensionRegistry);
messages.add(builder.build());
CodedInputStream stream = CodedInputStream.newInstance(this.output.asByteBuffer());
DataBufferUtils.release(this.output);
this.output = null;
Message message = getMessageBuilder(this.elementType.toClass())
.mergeFrom(stream, extensionRegistry)
.build();
messages.add(message);
}
} while (remainingBytesToRead > 0);
return messages;
@@ -286,6 +292,12 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this.offset = 0;
throw new DecodingException("Cannot parse message size: malformed varint");
}
public void discard() {
if (this.output != null) {
DataBufferUtils.release(this.output);
}
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
package org.springframework.http.codec.protobuf;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -31,6 +30,7 @@ import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageEncoder;
import org.springframework.lang.Nullable;
@@ -73,26 +73,29 @@ public class ProtobufEncoder extends ProtobufCodecSupport implements HttpMessage
public Flux<DataBuffer> encode(Publisher<? extends Message> inputStream, DataBufferFactory bufferFactory,
ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
return Flux
.from(inputStream)
.map(message -> encodeMessage(message, bufferFactory, !(inputStream instanceof Mono)));
}
private DataBuffer encodeMessage(Message message, DataBufferFactory bufferFactory, boolean streaming) {
DataBuffer buffer = bufferFactory.allocateBuffer();
OutputStream outputStream = buffer.asOutputStream();
try {
if (streaming) {
message.writeDelimitedTo(outputStream);
}
else {
message.writeTo(outputStream);
}
return buffer;
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
}
return Flux.from(inputStream)
.map(message -> {
DataBuffer buffer = bufferFactory.allocateBuffer();
boolean release = true;
try {
if (!(inputStream instanceof Mono)) {
message.writeDelimitedTo(buffer.asOutputStream());
}
else {
message.writeTo(buffer.asOutputStream());
}
release = false;
return buffer;
}
catch (IOException ex) {
throw new IllegalStateException("Unexpected I/O error while writing to data buffer", ex);
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
});
}
@Override