Server adapters release buffers on error/cancel

Review and update Servlet and Undertow adapters to release any data
buffers they be holding on to at the time of error or cancellation.

Also remove onDiscard hooks from Reactor and Undertow request body.
For Reactor we expect it to be handled. For Undertow there isn't
any Reactor Core upstream for the callback to be useful.

Issue: SPR-17410
This commit is contained in:
Rossen Stoyanchev
2018-10-19 21:45:14 -04:00
parent 149d416e8e
commit 862dd23975
10 changed files with 389 additions and 42 deletions

View File

@@ -163,6 +163,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
*/
protected abstract void readingPaused();
/**
* Invoked after an I/O read error from the underlying server or after a
* cancellation signal from the downstream consumer to allow sub-classes
* to discard any current cached data they might have.
* @since 5.1.2
*/
protected abstract void discardData();
// Private methods for use in State...
@@ -416,7 +424,10 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
}
<T> void cancel(AbstractListenerReadPublisher<T> publisher) {
if (!publisher.changeState(this, COMPLETED)) {
if (publisher.changeState(this, COMPLETED)) {
publisher.discardData();
}
else {
publisher.state.get().cancel(publisher);
}
}
@@ -439,6 +450,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable t) {
if (publisher.changeState(this, COMPLETED)) {
publisher.discardData();
Subscriber<? super T> s = publisher.subscriber;
if (s != null) {
s.onError(t);

View File

@@ -158,6 +158,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
// Technically, cancellation from the result subscriber should be propagated
// to the upstream subscription. In practice, HttpHandler server adapters
// don't have a reason to cancel the result subscription.
this.resultPublisher.subscribe(subscriber);
}
@@ -176,8 +179,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* data item for writing once that is possible.
*/
protected void dataReceived(T data) {
if (this.currentData != null) {
throw new IllegalStateException("Current data not processed yet: " + this.currentData);
T prev = this.currentData;
if (prev != null) {
// This shouldn't happen:
// 1. dataReceived can only be called from REQUESTED state
// 2. currentData is cleared before requesting
discardData(data);
cancel();
onError(new IllegalStateException("Received new data while current not processed yet."));
}
this.currentData = data;
}
@@ -226,6 +235,16 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
protected void writingFailed(Throwable ex) {
}
/**
* Invoked after any error (either from the upstream write Publisher, or
* from I/O operations to the underlying server) and cancellation
* to discard in-flight data that was in
* the process of being written when the error took place.
* @param data the data to be released
* @since 5.1.2
*/
protected abstract void discardData(T data);
// Private methods for use from State's...
@@ -245,6 +264,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private void changeStateToComplete(State oldState) {
if (changeState(oldState, State.COMPLETED)) {
discardCurrentData();
writingComplete();
this.resultPublisher.publishComplete();
}
@@ -263,6 +283,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
}
private void discardCurrentData() {
T data = this.currentData;
this.currentData = null;
if (data != null) {
discardData(data);
}
}
/**
* Represents a state for the {@link Processor} to be in.
@@ -378,11 +406,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
}
public <T> void onNext(AbstractListenerWriteProcessor<T> processor, T data) {
throw new IllegalStateException(toString());
processor.discardData(data);
processor.cancel();
processor.onError(new IllegalStateException("Illegal onNext without demand"));
}
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.discardCurrentData();
processor.writingComplete();
processor.resultPublisher.publishError(ex);
}

View File

@@ -29,9 +29,7 @@ import reactor.netty.Connection;
import reactor.netty.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.core.io.buffer.PooledDataBuffer;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
@@ -165,8 +163,7 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
Flux<DataBuffer> body = this.request.receive().retain().map(this.bufferFactory::wrap);
return body.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return this.request.receive().retain().map(this.bufferFactory::wrap);
}
@SuppressWarnings("unchecked")

View File

@@ -302,6 +302,11 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
// no-op
}
@Override
protected void discardData() {
// Nothing to discard since we pass data buffers on immediately..
}
private class RequestBodyPublisherReadListener implements ReadListener {

View File

@@ -334,6 +334,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
boolean ready = ServletServerHttpResponse.this.isWritePossible();
int remaining = dataBuffer.readableByteCount();
if (ready && remaining > 0) {
// In case of IOException, onError handling should call discardData(DataBuffer)..
int written = writeToOutputStream(dataBuffer);
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes");
@@ -359,6 +360,11 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
protected void writingComplete() {
bodyProcessor = null;
}
@Override
protected void discardData(DataBuffer dataBuffer) {
DataBufferUtils.release(dataBuffer);
}
}
}

View File

@@ -116,8 +116,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
return Flux.from(this.body)
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
return Flux.from(this.body);
}
@SuppressWarnings("unchecked")
@@ -201,6 +200,10 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
}
}
@Override
protected void discardData() {
// Nothing to discard since we pass data buffers on immediately..
}
}
private static class UndertowDataBuffer implements PooledDataBuffer {

View File

@@ -181,6 +181,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
// Track write listener calls from here on..
this.writePossible = false;
// In case of IOException, onError handling should call discardData(DataBuffer)..
int total = buffer.remaining();
int written = writeByteBuffer(buffer);
@@ -235,6 +236,11 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
cancel();
onError(ex);
}
@Override
protected void discardData(DataBuffer dataBuffer) {
DataBufferUtils.release(dataBuffer);
}
}