Replacing a couple of calls to Mono.fromCallable with Mono.just which
seems to work with doOnDiscard except when nested inside Flux.defer.
This commit is contained in:
Rossen Stoyanchev
2019-04-16 15:59:28 -04:00
parent 375090bb7c
commit 15b2fb1210
3 changed files with 10 additions and 13 deletions

View File

@@ -125,16 +125,14 @@ public class EncoderHttpMessageWriter<T> implements HttpMessageWriter<T> {
}))
.flatMap(buffer -> {
headers.setContentLength(buffer.readableByteCount());
return message.writeWith(
Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
return message.writeWith(Mono.just(buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release));
});
}
if (isStreamingMediaType(contentType)) {
return message.writeAndFlushWith(body.map(buffer ->
Mono.fromCallable(() -> buffer)
.doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
Mono.just(buffer).doOnDiscard(PooledDataBuffer.class, PooledDataBuffer::release)));
}
return message.writeWith(body);

View File

@@ -108,7 +108,7 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
});
}
return Flux.defer(() -> {
return Mono.fromCallable(() -> {
boolean release = true;
DataBuffer buffer = bufferFactory.allocateBuffer(1024);
try {
@@ -117,21 +117,21 @@ public class Jaxb2XmlEncoder extends AbstractSingleValueEncoder<Object> {
Marshaller marshaller = initMarshaller(clazz);
marshaller.marshal(value, outputStream);
release = false;
return Mono.fromCallable(() -> buffer); // relying on doOnDiscard in base class
return buffer; // relying on doOnDiscard in base class
}
catch (MarshalException ex) {
return Flux.error(new EncodingException(
"Could not marshal " + value.getClass() + " to XML", ex));
throw new EncodingException(
"Could not marshal " + value.getClass() + " to XML", ex);
}
catch (JAXBException ex) {
return Flux.error(new CodecException("Invalid JAXB configuration", ex));
throw new CodecException("Invalid JAXB configuration", ex);
}
finally {
if (release) {
DataBufferUtils.release(buffer);
}
}
});
}).flux();
}
private Marshaller initMarshaller(Class<?> clazz) throws JAXBException {