AbstractListenerServerHttpResponse improvements

This commit changes writeWithInternal(Publisher<DataBuffer> body).
It is implemented as writeAndFlushWith(Mono.just(body)).
This commit is contained in:
Violeta Georgieva
2016-07-25 14:57:45 +03:00
committed by Rossen Stoyanchev
parent fffea06056
commit 16939b7bc7
3 changed files with 24 additions and 41 deletions

View File

@@ -40,17 +40,7 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
@Override
protected final Mono<Void> writeWithInternal(Publisher<DataBuffer> body) {
if (this.writeCalled.compareAndSet(false, true)) {
Processor<DataBuffer, Void> bodyProcessor = createBodyProcessor();
return Mono.from(subscriber -> {
body.subscribe(bodyProcessor);
bodyProcessor.subscribe(subscriber);
});
} else {
return Mono.error(new IllegalStateException(
"writeWith() or writeAndFlushWith() has already been called"));
}
return writeAndFlushWithInternal(Mono.just(body));
}
@Override
@@ -68,13 +58,6 @@ public abstract class AbstractListenerServerHttpResponse extends AbstractServerH
}
}
/**
* Abstract template method to create a {@code Processor<DataBuffer, Void>} that
* will write the response body to the underlying output. Called from
* {@link #writeWithInternal(Publisher)}.
*/
protected abstract Processor<DataBuffer, Void> createBodyProcessor();
/**
* Abstract template method to create a {@code Processor<Publisher<DataBuffer>, Void>}
* that will write the response body with flushes to the underlying output. Called from

View File

@@ -22,13 +22,13 @@ import java.io.UncheckedIOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletResponse;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -44,7 +44,7 @@ import org.springframework.util.Assert;
*/
public class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private final AtomicBoolean listenerRegistered = new AtomicBoolean();
private final ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener();
private volatile ResponseBodyProcessor bodyProcessor;
@@ -112,15 +112,17 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
private void registerListener() throws IOException {
if (this.listenerRegistered.compareAndSet(false, true)) {
ResponseBodyWriteListener writeListener = new ResponseBodyWriteListener();
this.response.getOutputStream().setWriteListener(writeListener);
private void registerListener() {
try {
outputStream().setWriteListener(writeListener);
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private void flush() throws IOException {
ServletOutputStream outputStream = this.response.getOutputStream();
ServletOutputStream outputStream = outputStream();
if (outputStream.isReady()) {
try {
outputStream.flush();
@@ -136,22 +138,15 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
}
}
@Override
protected ResponseBodyProcessor createBodyProcessor() {
try {
registerListener();
this.bodyProcessor = new ResponseBodyProcessor(this.response.getOutputStream(),
this.bufferSize);
return this.bodyProcessor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
private ServletOutputStream outputStream() throws IOException {
return this.response.getOutputStream();
}
@Override
protected AbstractResponseBodyFlushProcessor createBodyFlushProcessor() {
return new ResponseBodyFlushProcessor();
protected Processor<Publisher<DataBuffer>, Void> createBodyFlushProcessor() {
Processor<Publisher<DataBuffer>, Void> processor = new ResponseBodyFlushProcessor();
registerListener();
return processor;
}
private class ResponseBodyProcessor extends AbstractResponseBodyProcessor {
@@ -238,7 +233,13 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
@Override
protected Processor<DataBuffer, Void> createBodyProcessor() {
return ServletServerHttpResponse.this.createBodyProcessor();
try {
bodyProcessor = new ResponseBodyProcessor(outputStream(), bufferSize);
return bodyProcessor;
}
catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override

View File

@@ -123,8 +123,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
}
}
@Override
protected ResponseBodyProcessor createBodyProcessor() {
private ResponseBodyProcessor createBodyProcessor() {
if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel();
}