Add option to decode from a DataBuffer
See gh-22782
This commit is contained in:
@@ -110,20 +110,26 @@ public abstract class AbstractJackson2Decoder extends Jackson2CodecSupport imple
|
||||
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return DataBufferUtils.join(input).map(dataBuffer -> {
|
||||
try {
|
||||
ObjectReader objectReader = getObjectReader(elementType, hints);
|
||||
Object value = objectReader.readValue(dataBuffer.asInputStream());
|
||||
logValue(value, hints);
|
||||
return value;
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw processException(ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
});
|
||||
return DataBufferUtils.join(input)
|
||||
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object decode(DataBuffer dataBuffer, ResolvableType targetType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
|
||||
|
||||
try {
|
||||
ObjectReader objectReader = getObjectReader(targetType, hints);
|
||||
Object value = objectReader.readValue(dataBuffer.asInputStream());
|
||||
logValue(value, hints);
|
||||
return value;
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw processException(ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
private ObjectReader getObjectReader(ResolvableType elementType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
@@ -127,26 +127,32 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
|
||||
public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return DataBufferUtils.join(inputStream).map(dataBuffer -> {
|
||||
try {
|
||||
Message.Builder builder = getMessageBuilder(elementType.toClass());
|
||||
ByteBuffer buffer = dataBuffer.asByteBuffer();
|
||||
builder.mergeFrom(CodedInputStream.newInstance(buffer), this.extensionRegistry);
|
||||
return builder.build();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new DecodingException("I/O error while parsing input stream", ex);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
);
|
||||
return DataBufferUtils.join(inputStream)
|
||||
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message decode(DataBuffer dataBuffer, ResolvableType targetType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
|
||||
|
||||
try {
|
||||
Message.Builder builder = getMessageBuilder(targetType.toClass());
|
||||
ByteBuffer buffer = dataBuffer.asByteBuffer();
|
||||
builder.mergeFrom(CodedInputStream.newInstance(buffer), this.extensionRegistry);
|
||||
return builder.build();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new DecodingException("I/O error while parsing input stream", ex);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create a new {@code Message.Builder} instance for the given class.
|
||||
* <p>This method uses a ConcurrentHashMap for caching method lookups.
|
||||
|
||||
@@ -143,20 +143,27 @@ public class Jaxb2XmlDecoder extends AbstractDecoder<Object> {
|
||||
public Mono<Object> decodeToMono(Publisher<DataBuffer> input, ResolvableType elementType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
|
||||
|
||||
return DataBufferUtils.join(input).map(dataBuffer -> {
|
||||
try {
|
||||
Iterator eventReader = inputFactory.createXMLEventReader(dataBuffer.asInputStream());
|
||||
List<XMLEvent> events = new ArrayList<>();
|
||||
eventReader.forEachRemaining(event -> events.add((XMLEvent) event));
|
||||
return unmarshal(events, elementType.toClass());
|
||||
}
|
||||
catch (XMLStreamException ex) {
|
||||
throw Exceptions.propagate(ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
});
|
||||
return DataBufferUtils.join(input)
|
||||
.map(dataBuffer -> decode(dataBuffer, elementType, mimeType, hints));
|
||||
}
|
||||
|
||||
@Override
|
||||
@SuppressWarnings({"rawtypes", "unchecked", "cast"}) // XMLEventReader is Iterator<Object> on JDK 9
|
||||
public Object decode(DataBuffer dataBuffer, ResolvableType targetType,
|
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) throws DecodingException {
|
||||
|
||||
try {
|
||||
Iterator eventReader = inputFactory.createXMLEventReader(dataBuffer.asInputStream());
|
||||
List<XMLEvent> events = new ArrayList<>();
|
||||
eventReader.forEachRemaining(event -> events.add((XMLEvent) event));
|
||||
return unmarshal(events, targetType.toClass());
|
||||
}
|
||||
catch (XMLStreamException ex) {
|
||||
throw Exceptions.propagate(ex);
|
||||
}
|
||||
finally {
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
}
|
||||
}
|
||||
|
||||
private Object unmarshal(List<XMLEvent> events, Class<?> outputClass) {
|
||||
|
||||
Reference in New Issue
Block a user