Polish during review of DataBuffer handling

This commit is contained in:
Rossen Stoyanchev
2019-04-09 22:18:44 -04:00
parent bd956ed75a
commit b11e7feff6
15 changed files with 177 additions and 145 deletions

View File

@@ -125,8 +125,9 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
return message.writeWith(
Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
}
@@ -162,10 +163,16 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
}
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
return (contentType != null && this.encoder instanceof HttpMessageEncoder &&
((HttpMessageEncoder<?>) this.encoder).getStreamingMediaTypes().stream()
.anyMatch(streamingMediaType -> contentType.isCompatibleWith(streamingMediaType) &&
contentType.getParameters().entrySet().containsAll(streamingMediaType.getParameters().keySet())));
if (contentType == null || !(this.encoder instanceof HttpMessageEncoder)) {
return false;
}
for (MediaType mediaType : ((HttpMessageEncoder<?>) this.encoder).getStreamingMediaTypes()) {
if (contentType.isCompatibleWith(mediaType) &&
contentType.getParameters().entrySet().containsAll(mediaType.getParameters().keySet())) {
return true;
}
}
return false;
}

View File

@@ -56,7 +56,7 @@ public class FormHttpMessageReader extends LoggingCodecSupport
*/
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private static final ResolvableType MULTIVALUE_TYPE =
private static final ResolvableType MULTIVALUE_STRINGS_TYPE =
ResolvableType.forClassWithGenerics(MultiValueMap.class, String.class, String.class);
@@ -83,9 +83,11 @@ public class FormHttpMessageReader extends LoggingCodecSupport
@Override
public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType) {
return ((MULTIVALUE_TYPE.isAssignableFrom(elementType) ||
(elementType.hasUnresolvableGenerics() &&
MultiValueMap.class.isAssignableFrom(elementType.toClass()))) &&
boolean multiValueUnresolved =
elementType.hasUnresolvableGenerics() &&
MultiValueMap.class.isAssignableFrom(elementType.toClass());
return ((MULTIVALUE_STRINGS_TYPE.isAssignableFrom(elementType) || multiValueUnresolved) &&
(mediaType == null || MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)));
}

View File

@@ -164,8 +164,8 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
}
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
Mono<DataBuffer> input = Mono.just(bufferFactory.wrap(bytes));
return this.decoder.decodeToMono(input, dataType, MediaType.TEXT_EVENT_STREAM, hints);
DataBuffer buffer = bufferFactory.wrap(bytes); // wrapping only, no allocation
return this.decoder.decodeToMono(Mono.just(buffer), dataType, MediaType.TEXT_EVENT_STREAM, hints);
}
@Override

View File

@@ -184,7 +184,7 @@ public class ServerSentEventHttpMessageWriter implements HttpMessageWriter<Objec
private Mono<DataBuffer> encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
return Mono.fromCallable(() -> bufferFactory.wrap(bytes)); // wrapping, not allocating
return Mono.just(bufferFactory.wrap(bytes)); // wrapping, not allocating
}
@Override

View File

@@ -111,13 +111,13 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
return Flux.defer(() -> {
boolean release = true;
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
OutputStream outputStream = buffer.asOutputStream();
Class<?> clazz = ClassUtils.getUserClass(value);
try {
OutputStream outputStream = buffer.asOutputStream();
Class<?> clazz = ClassUtils.getUserClass(value);
Marshaller marshaller = initMarshaller(clazz);
marshaller.marshal(value, outputStream);
release = false;
return Mono.fromCallable(() -> buffer); // Rely on doOnDiscard in base class
return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class
}
catch (MarshalException ex) {
return Flux.error(new EncodingException(

View File

@@ -33,8 +33,8 @@ import com.fasterxml.aalto.AsyncXMLStreamReader;
import com.fasterxml.aalto.evt.EventAllocatorImpl;
import com.fasterxml.aalto.stax.InputFactoryImpl;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.AbstractDecoder;
@@ -95,27 +95,30 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
@Override
@SuppressWarnings({"rawtypes", "unchecked"}) // on JDK 9 where XMLEventReader is Iterator<Object>
public Flux<XMLEvent> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
public Flux<XMLEvent> decode(Publisher<DataBuffer> input, ResolvableType elementType,
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) {
Flux<DataBuffer> flux = Flux.from(inputStream);
if (this.useAalto) {
AaltoDataBufferToXmlEvent aaltoMapper = new AaltoDataBufferToXmlEvent();
return flux.flatMap(aaltoMapper)
.doFinally(signalType -> aaltoMapper.endOfInput());
AaltoDataBufferToXmlEvent mapper = new AaltoDataBufferToXmlEvent();
return Flux.from(input)
.flatMapIterable(mapper)
.doFinally(signalType -> mapper.endOfInput());
}
else {
Mono<DataBuffer> singleBuffer = DataBufferUtils.join(flux);
return singleBuffer.
flatMapMany(dataBuffer -> {
return DataBufferUtils.join(input).
flatMapIterable(buffer -> {
try {
InputStream is = dataBuffer.asInputStream();
InputStream is = buffer.asInputStream();
Iterator eventReader = inputFactory.createXMLEventReader(is);
return Flux.fromIterable((Iterable<XMLEvent>) () -> eventReader)
.doFinally(t -> DataBufferUtils.release(dataBuffer));
List<XMLEvent> result = new ArrayList<>();
eventReader.forEachRemaining(event -> result.add((XMLEvent) event));
return result;
}
catch (XMLStreamException ex) {
return Mono.error(ex);
throw Exceptions.propagate(ex);
}
finally {
DataBufferUtils.release(buffer);
}
});
}
@@ -125,7 +128,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
/*
* Separate static class to isolate Aalto dependency.
*/
private static class AaltoDataBufferToXmlEvent implements Function<DataBuffer, Publisher<? extends XMLEvent>> {
private static class AaltoDataBufferToXmlEvent implements Function<DataBuffer, List<? extends XMLEvent>> {
private static final AsyncXMLInputFactory inputFactory =
StaxUtils.createDefensiveInputFactory(InputFactoryImpl::new);
@@ -137,7 +140,7 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
@Override
public Publisher<? extends XMLEvent> apply(DataBuffer dataBuffer) {
public List<? extends XMLEvent> apply(DataBuffer dataBuffer) {
try {
this.streamReader.getInputFeeder().feedInput(dataBuffer.asByteBuffer());
List<XMLEvent> events = new ArrayList<>();
@@ -154,10 +157,10 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
}
}
}
return Flux.fromIterable(events);
return events;
}
catch (XMLStreamException ex) {
return Mono.error(ex);
throw Exceptions.propagate(ex);
}
finally {
DataBufferUtils.release(dataBuffer);

View File

@@ -181,8 +181,7 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public final Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return new ChannelSendOperator<>(body,
writePublisher -> doCommit(() -> writeAndFlushWithInternal(writePublisher)))
return new ChannelSendOperator<>(body, inner -> doCommit(() -> writeAndFlushWithInternal(inner)))
.doOnError(t -> removeContentLength());
}