Fix ProtobufDecoder handling of split message size
This commit introduces a new readMessageSize(DataBuffer input) private method, inspired from CodedInputStream#readRawVarint32(int, InputStream) and adapted for DataBuffer using MessageDecoderFunction fields in order to support use cases where the message size is split between distinct chunks. It also fixes handling of end of streams by using DataBuffer#readableByteCount instead of -1 which is only relevant with InputStream. Issue: SPR-17429
This commit is contained in:
@@ -172,6 +172,8 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||
|
||||
private int messageBytesToRead;
|
||||
|
||||
private int offset;
|
||||
|
||||
|
||||
public MessageDecoderFunction(ResolvableType elementType, int maxMessageSize) {
|
||||
this.elementType = elementType;
|
||||
@@ -188,11 +190,9 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||
|
||||
do {
|
||||
if (this.output == null) {
|
||||
int firstByte = input.read();
|
||||
if (firstByte == -1) {
|
||||
throw new DecodingException("Cannot parse message size");
|
||||
if (!readMessageSize(input)) {
|
||||
return messages;
|
||||
}
|
||||
this.messageBytesToRead = CodedInputStream.readRawVarint32(firstByte, input.asInputStream());
|
||||
if (this.messageBytesToRead > this.maxMessageSize) {
|
||||
throw new DecodingException(
|
||||
"The number of bytes to read from the incoming stream " +
|
||||
@@ -235,6 +235,57 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||
DataBufferUtils.release(input);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse message size as a varint from the input stream, updating {@code messageBytesToRead} and
|
||||
* {@code offset} fields if needed to allow processing of upcoming chunks.
|
||||
* Inspired from {@link CodedInputStream#readRawVarint32(int, java.io.InputStream)}
|
||||
*
|
||||
* @return {code true} when the message size is parsed successfully, {code false} when the message size is
|
||||
* truncated
|
||||
* @see <a href ="https://developers.google.com/protocol-buffers/docs/encoding#varints">Base 128 Varints</a>
|
||||
*/
|
||||
private boolean readMessageSize(DataBuffer input) {
|
||||
if (this.offset == 0) {
|
||||
if (input.readableByteCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
int firstByte = input.read();
|
||||
if ((firstByte & 0x80) == 0) {
|
||||
this.messageBytesToRead = firstByte;
|
||||
return true;
|
||||
}
|
||||
this.messageBytesToRead = firstByte & 0x7f;
|
||||
this.offset = 7;
|
||||
}
|
||||
|
||||
if (this.offset < 32) {
|
||||
for (; this.offset < 32; this.offset += 7) {
|
||||
if (input.readableByteCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
final int b = input.read();
|
||||
this.messageBytesToRead |= (b & 0x7f) << offset;
|
||||
if ((b & 0x80) == 0) {
|
||||
this.offset = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Keep reading up to 64 bits.
|
||||
for (; this.offset < 64; this.offset += 7) {
|
||||
if (input.readableByteCount() == 0) {
|
||||
return false;
|
||||
}
|
||||
final int b = input.read();
|
||||
if ((b & 0x80) == 0) {
|
||||
this.offset = 0;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
this.offset = 0;
|
||||
throw new DecodingException("Cannot parse message size: malformed varint");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user