Fine-tune WebFlux logging at HTTP/WebSocket level
1. Use special category prefix "spring-web.reactivestreams" for logging of reactive streams signals in spring-web, since those are quite verbose would fill the logs at TRACE. 2. Add and use loggers in request and websocket session implementations separate from reactive streams bridge for regular TRACE logging. 3. Improve log messages and add where missing (e.g. for Reactor) Issue: SPR-16898
This commit is contained in:
@@ -47,7 +47,14 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
/**
|
||||
* Special logger for tracing Reactive Streams signals.
|
||||
* <p>This logger is not exposed under "org.springframework" because it is
|
||||
* verbose. To enable this, and other related Reactive Streams loggers in
|
||||
* this package, set "spring-web.reactivestreams" to TRACE.
|
||||
*/
|
||||
protected static Log rsReadLogger = LogFactory.getLog("spring-web.reactivestreams.ReadPublisher");
|
||||
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
|
||||
|
||||
@@ -106,7 +113,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
* container.
|
||||
*/
|
||||
public final void onDataAvailable() {
|
||||
logger.trace(getLogPrefix() + "onDataAvailable");
|
||||
rsReadLogger.trace(getLogPrefix() + "onDataAvailable");
|
||||
this.state.get().onDataAvailable(this);
|
||||
}
|
||||
|
||||
@@ -115,7 +122,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
* all data has been read.
|
||||
*/
|
||||
public void onAllDataRead() {
|
||||
logger.trace(getLogPrefix() + "onAllDataRead");
|
||||
rsReadLogger.trace(getLogPrefix() + "onAllDataRead");
|
||||
this.state.get().onAllDataRead(this);
|
||||
}
|
||||
|
||||
@@ -123,8 +130,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
* Sub-classes can call this to delegate container error notifications.
|
||||
*/
|
||||
public final void onError(Throwable ex) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Connection error: " + ex);
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Connection error: " + ex);
|
||||
}
|
||||
this.state.get().onError(this, ex);
|
||||
}
|
||||
@@ -174,14 +181,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
}
|
||||
Subscriber<? super T> subscriber = this.subscriber;
|
||||
Assert.state(subscriber != null, "No subscriber");
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Publishing data read");
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Publishing data read");
|
||||
}
|
||||
subscriber.onNext(data);
|
||||
}
|
||||
else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "No more data to read");
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "No more data to read");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@@ -191,8 +198,8 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
|
||||
private boolean changeState(State oldState, State newState) {
|
||||
boolean result = this.state.compareAndSet(oldState, newState);
|
||||
if (result && logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
if (result && rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -221,16 +228,16 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
|
||||
@Override
|
||||
public final void request(long n) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + n + " requested");
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + n + " requested");
|
||||
}
|
||||
state.get().request(AbstractListenerReadPublisher.this, n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void cancel() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Cancellation");
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Cancellation");
|
||||
}
|
||||
state.get().cancel(AbstractListenerReadPublisher.this);
|
||||
}
|
||||
@@ -270,13 +277,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
|
||||
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
|
||||
String logPrefix = publisher.getLogPrefix();
|
||||
if (publisher.completionBeforeDemand) {
|
||||
publisher.logger.trace(logPrefix + "Completed before demand");
|
||||
rsReadLogger.trace(logPrefix + "Completed before demand");
|
||||
publisher.state.get().onAllDataRead(publisher);
|
||||
}
|
||||
Throwable ex = publisher.errorBeforeDemand;
|
||||
if (ex != null) {
|
||||
if (publisher.logger.isTraceEnabled()) {
|
||||
publisher.logger.trace(logPrefix + "Completed with error before demand: " + ex);
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
|
||||
}
|
||||
publisher.state.get().onError(publisher, ex);
|
||||
}
|
||||
|
||||
@@ -42,7 +42,15 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
public abstract class AbstractListenerWriteFlushProcessor<T> implements Processor<Publisher<? extends T>, Void> {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
/**
|
||||
* Special logger for tracing Reactive Streams signals.
|
||||
* <p>This logger is not exposed under "org.springframework" because it is
|
||||
* verbose. To enable this, and other related Reactive Streams loggers in
|
||||
* this package, set "spring-web.reactivestreams" to TRACE.
|
||||
*/
|
||||
protected static final Log rsWriteFlushLogger =
|
||||
LogFactory.getLog("spring-web.reactivestreams.WriteFlushProcessor");
|
||||
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
|
||||
|
||||
@@ -88,8 +96,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
|
||||
@Override
|
||||
public final void onNext(Publisher<? extends T> publisher) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Received onNext publisher");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher");
|
||||
}
|
||||
this.state.get().onNext(this, publisher);
|
||||
}
|
||||
@@ -100,8 +108,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
*/
|
||||
@Override
|
||||
public final void onError(Throwable ex) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Received onError: " + ex);
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex);
|
||||
}
|
||||
this.state.get().onError(this, ex);
|
||||
}
|
||||
@@ -112,8 +120,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
*/
|
||||
@Override
|
||||
public final void onComplete() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Received onComplete");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete");
|
||||
}
|
||||
this.state.get().onComplete(this);
|
||||
}
|
||||
@@ -132,8 +140,8 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
* container to cancel the upstream subscription.
|
||||
*/
|
||||
protected void cancel() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Received request to cancel");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel");
|
||||
}
|
||||
if (this.subscription != null) {
|
||||
this.subscription.cancel();
|
||||
@@ -190,16 +198,16 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
|
||||
private boolean changeState(State oldState, State newState) {
|
||||
boolean result = this.state.compareAndSet(oldState, newState);
|
||||
if (result && logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
if (result && rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private void flushIfPossible() {
|
||||
boolean result = isWritePossible();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "isWritePossible[" + result + "]");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "isWritePossible[" + result + "]");
|
||||
}
|
||||
if (result) {
|
||||
onFlushPossible();
|
||||
@@ -400,12 +408,10 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
|
||||
|
||||
@Override
|
||||
public void onComplete() {
|
||||
Log logger = this.processor.logger;
|
||||
AtomicReference<State> state = this.processor.state;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.processor.getLogPrefix() + state + " writeComplete");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete");
|
||||
}
|
||||
state.get().writeComplete(this.processor);
|
||||
this.processor.state.get().writeComplete(this.processor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +44,14 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, Void> {
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
/**
|
||||
* Special logger for tracing Reactive Streams signals.
|
||||
* <p>This logger is not exposed under "org.springframework" because it is
|
||||
* verbose. To enable this, and other related Reactive Streams loggers in
|
||||
* this package, set "spring-web.reactivestreams" to TRACE.
|
||||
*/
|
||||
protected static final Log rsWriteLogger = LogFactory.getLog("spring-web.reactivestreams.WriteProcessor");
|
||||
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
|
||||
|
||||
@@ -93,8 +100,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
|
||||
@Override
|
||||
public final void onNext(T data) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Item to write");
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Item to write");
|
||||
}
|
||||
this.state.get().onNext(this, data);
|
||||
}
|
||||
@@ -105,8 +112,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
*/
|
||||
@Override
|
||||
public final void onError(Throwable ex) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Write source error: " + ex);
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Write source error: " + ex);
|
||||
}
|
||||
this.state.get().onError(this, ex);
|
||||
}
|
||||
@@ -117,8 +124,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
*/
|
||||
@Override
|
||||
public final void onComplete() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "No more items to write");
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "No more items to write");
|
||||
}
|
||||
this.state.get().onComplete(this);
|
||||
}
|
||||
@@ -129,8 +136,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
* container.
|
||||
*/
|
||||
public final void onWritePossible() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "onWritePossible");
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "onWritePossible");
|
||||
}
|
||||
this.state.get().onWritePossible(this);
|
||||
}
|
||||
@@ -140,7 +147,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
* container to cancel the upstream subscription.
|
||||
*/
|
||||
public void cancel() {
|
||||
logger.trace(getLogPrefix() + "Cancellation");
|
||||
rsWriteLogger.trace(getLogPrefix() + "Cancellation");
|
||||
if (this.subscription != null) {
|
||||
this.subscription.cancel();
|
||||
}
|
||||
@@ -223,8 +230,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
|
||||
private boolean changeState(State oldState, State newState) {
|
||||
boolean result = this.state.compareAndSet(oldState, newState);
|
||||
if (result && logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
if (result && rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + oldState + " -> " + newState);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@@ -247,8 +254,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
|
||||
|
||||
private void writeIfPossible() {
|
||||
boolean result = isWritePossible();
|
||||
if (!result && logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Writing not possible");
|
||||
if (!result && rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "isWritePossible: false");
|
||||
}
|
||||
if (result) {
|
||||
onWritePossible();
|
||||
|
||||
@@ -43,7 +43,7 @@ import org.springframework.util.StringUtils;
|
||||
*/
|
||||
public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ServerHttpRequest.class);
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
private static final Pattern QUERY_PATTERN = Pattern.compile("([^&=]+)(=?)([^&]+)?");
|
||||
|
||||
@@ -156,8 +156,8 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
|
||||
}
|
||||
catch (UnsupportedEncodingException ex) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn(getLogPrefix() + "Could not decode query param [" + value + "] as 'UTF-8'. " +
|
||||
"Falling back on default encoding; exception message: " + ex.getMessage());
|
||||
logger.warn(getLogPrefix() + "Could not decode query value [" + value + "] as 'UTF-8'. " +
|
||||
"Falling back on default encoding: " + ex.getMessage());
|
||||
}
|
||||
return URLDecoder.decode(value);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -55,6 +57,8 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
|
||||
*/
|
||||
private enum State {NEW, COMMITTING, COMMITTED}
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
|
||||
private final DataBufferFactory dataBufferFactory;
|
||||
|
||||
|
||||
@@ -55,18 +55,16 @@ public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest,
|
||||
public Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {
|
||||
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());
|
||||
try {
|
||||
ServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
|
||||
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);
|
||||
ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);
|
||||
ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory, request);
|
||||
|
||||
if (request.getMethod() == HttpMethod.HEAD) {
|
||||
response = new HttpHeadResponseDecorator(response);
|
||||
}
|
||||
|
||||
String logPrefix = ((ReactorServerHttpRequest) request).getLogPrefix();
|
||||
|
||||
return this.httpHandler.handle(request, response)
|
||||
.doOnError(ex -> logger.trace(logPrefix + "Failed to complete: " + ex.getMessage()))
|
||||
.doOnSuccess(aVoid -> logger.trace(logPrefix + "Handling completed"));
|
||||
.doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage()))
|
||||
.doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));
|
||||
}
|
||||
catch (URISyntaxException ex) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
||||
@@ -165,7 +165,13 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
|
||||
|
||||
@Override
|
||||
public Flux<DataBuffer> getBody() {
|
||||
return this.request.receive().retain().map(this.bufferFactory::wrap);
|
||||
return this.request.receive().retain()
|
||||
.doOnNext(buffer -> {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Read " + buffer.readableBytes() + " bytes");
|
||||
}
|
||||
})
|
||||
.map(this.bufferFactory::wrap);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
||||
@@ -45,11 +45,16 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze
|
||||
|
||||
private final HttpServerResponse response;
|
||||
|
||||
private final ReactorServerHttpRequest request;
|
||||
|
||||
|
||||
public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory,
|
||||
ReactorServerHttpRequest request) {
|
||||
|
||||
public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
|
||||
super(bufferFactory);
|
||||
Assert.notNull(response, "HttpServerResponse must not be null");
|
||||
this.response = response;
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
|
||||
@@ -70,15 +75,12 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze
|
||||
|
||||
@Override
|
||||
protected Mono<Void> writeWithInternal(Publisher<? extends DataBuffer> publisher) {
|
||||
Publisher<ByteBuf> body = toByteBufs(publisher);
|
||||
return this.response.send(body).then();
|
||||
return this.response.send(toByteBufs(publisher)).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Mono<Void> writeAndFlushWithInternal(Publisher<? extends Publisher<? extends DataBuffer>> publisher) {
|
||||
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
|
||||
.map(ReactorServerHttpResponse::toByteBufs);
|
||||
return this.response.sendGroups(body).then();
|
||||
return this.response.sendGroups(Flux.from(publisher).map(this::toByteBufs)).then();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -116,8 +118,13 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze
|
||||
return doCommit(() -> this.response.sendFile(file, position, count).then());
|
||||
}
|
||||
|
||||
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
|
||||
return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf);
|
||||
private Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
|
||||
return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf)
|
||||
.doOnNext(byteBuf -> {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("Writing " + byteBuf.readableBytes() + " bytes");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ import javax.servlet.http.Cookie;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
@@ -60,9 +59,6 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
|
||||
static final DataBuffer EOF_BUFFER = new DefaultDataBufferFactory().allocateBuffer(0);
|
||||
|
||||
|
||||
protected final Log logger = LogFactory.getLog(getClass());
|
||||
|
||||
|
||||
private final HttpServletRequest request;
|
||||
|
||||
private final RequestBodyPublisher bodyPublisher;
|
||||
@@ -204,7 +200,13 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
|
||||
DataBuffer readFromInputStream() throws IOException {
|
||||
int read = this.request.getInputStream().read(this.buffer);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "InputStream.read returned " + read + (read != -1 ? " bytes" : ""));
|
||||
logger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : ""));
|
||||
}
|
||||
else {
|
||||
Log rsReadLogger = AbstractListenerReadPublisher.rsReadLogger;
|
||||
if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : ""));
|
||||
}
|
||||
}
|
||||
|
||||
if (read > 0) {
|
||||
|
||||
@@ -281,8 +281,8 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
|
||||
|
||||
@Override
|
||||
protected void flush() throws IOException {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "flush");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "Flush attempt");
|
||||
}
|
||||
ServletServerHttpResponse.this.flush();
|
||||
}
|
||||
@@ -319,29 +319,33 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
|
||||
@Override
|
||||
protected boolean write(DataBuffer dataBuffer) throws IOException {
|
||||
if (ServletServerHttpResponse.this.flushOnNext) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "flush");
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Flush attempt");
|
||||
}
|
||||
flush();
|
||||
}
|
||||
|
||||
boolean ready = ServletServerHttpResponse.this.isWritePossible();
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "write: " + dataBuffer + " ready: " + ready);
|
||||
}
|
||||
int remaining = dataBuffer.readableByteCount();
|
||||
if (ready && remaining > 0) {
|
||||
int written = writeToOutputStream(dataBuffer);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "written: " + written + " total: " + remaining);
|
||||
logger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes");
|
||||
}
|
||||
else if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes");
|
||||
}
|
||||
if (written == remaining) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "releaseData: " + dataBuffer);
|
||||
}
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else {
|
||||
if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "ready: " + ready + ", remaining: " + remaining);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import io.undertow.connector.PooledByteBuffer;
|
||||
import io.undertow.server.HttpServerExchange;
|
||||
import io.undertow.server.handlers.Cookie;
|
||||
import io.undertow.util.HeaderValues;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.xnio.channels.StreamSourceChannel;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
@@ -66,7 +67,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||
|
||||
super(initUri(exchange), "", initHeaders(exchange));
|
||||
this.exchange = exchange;
|
||||
this.body = new RequestBodyPublisher(exchange, bufferFactory, getLogPrefix());
|
||||
this.body = new RequestBodyPublisher(exchange, bufferFactory);
|
||||
this.body.registerListeners(exchange);
|
||||
}
|
||||
|
||||
@@ -134,7 +135,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||
}
|
||||
|
||||
|
||||
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
|
||||
private class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
|
||||
|
||||
private final StreamSourceChannel channel;
|
||||
|
||||
@@ -142,8 +143,9 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||
|
||||
private final ByteBufferPool byteBufferPool;
|
||||
|
||||
public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory, String logPrefix) {
|
||||
super(logPrefix);
|
||||
|
||||
public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
|
||||
super(UndertowServerHttpRequest.this.getLogPrefix());
|
||||
this.channel = exchange.getRequestChannel();
|
||||
this.bufferFactory = bufferFactory;
|
||||
this.byteBufferPool = exchange.getConnection().getByteBufferPool();
|
||||
@@ -178,10 +180,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
|
||||
boolean release = true;
|
||||
try {
|
||||
ByteBuffer byteBuffer = pooledByteBuffer.getBuffer();
|
||||
|
||||
int read = this.channel.read(byteBuffer);
|
||||
|
||||
Log logger = UndertowServerHttpRequest.this.logger;
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "Channel.read returned " + read + (read != -1 ? " bytes" : ""));
|
||||
logger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : ""));
|
||||
}
|
||||
else if (rsReadLogger.isTraceEnabled()) {
|
||||
rsReadLogger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : ""));
|
||||
}
|
||||
|
||||
if (read > 0) {
|
||||
|
||||
@@ -173,9 +173,6 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
|
||||
if (buffer == null) {
|
||||
return false;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "write (" + dataBuffer.readableByteCount() + " bytes)");
|
||||
}
|
||||
|
||||
// Track write listener calls from here on..
|
||||
this.writePossible = false;
|
||||
@@ -184,7 +181,10 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
|
||||
int written = writeByteBuffer(buffer);
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "written " + written + ", " + total + " remaining");
|
||||
logger.trace(getLogPrefix() + "Wrote " + written + " of " + total + " bytes");
|
||||
}
|
||||
else if (rsWriteLogger.isTraceEnabled()) {
|
||||
rsWriteLogger.trace(getLogPrefix() + "Wrote " + written + " of " + total + " bytes");
|
||||
}
|
||||
if (written != total) {
|
||||
return false;
|
||||
@@ -193,9 +193,6 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
|
||||
// We wrote all, so can still write more..
|
||||
this.writePossible = true;
|
||||
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "releaseData: " + dataBuffer);
|
||||
}
|
||||
DataBufferUtils.release(dataBuffer);
|
||||
this.byteBuffer = null;
|
||||
return true;
|
||||
@@ -252,8 +249,8 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
|
||||
protected void flush() throws IOException {
|
||||
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
|
||||
if (channel != null) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(getLogPrefix() + "flush");
|
||||
if (rsWriteFlushLogger.isTraceEnabled()) {
|
||||
rsWriteFlushLogger.trace(getLogPrefix() + "flush");
|
||||
}
|
||||
channel.flush();
|
||||
}
|
||||
|
||||
@@ -38,7 +38,15 @@ import org.springframework.util.Assert;
|
||||
*/
|
||||
class WriteResultPublisher implements Publisher<Void> {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(WriteResultPublisher.class);
|
||||
/**
|
||||
* Special logger for tracing Reactive Streams signals.
|
||||
* <p>This logger is not exposed under "org.springframework" because it is
|
||||
* verbose. To enable this, and other related Reactive Streams loggers in
|
||||
* this package, set "spring-web.reactivestreams" to TRACE.
|
||||
*/
|
||||
private static final Log rsWriteResultLogger =
|
||||
LogFactory.getLog("spring-web.reactivestreams.WriteResultPublisher");
|
||||
|
||||
|
||||
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
|
||||
|
||||
@@ -60,8 +68,8 @@ class WriteResultPublisher implements Publisher<Void> {
|
||||
|
||||
@Override
|
||||
public final void subscribe(Subscriber<? super Void> subscriber) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.logPrefix + this.state + " subscribe: " + subscriber);
|
||||
if (rsWriteResultLogger.isTraceEnabled()) {
|
||||
rsWriteResultLogger.trace(this.logPrefix + this.state + " subscribe: " + subscriber);
|
||||
}
|
||||
this.state.get().subscribe(this, subscriber);
|
||||
}
|
||||
@@ -70,8 +78,8 @@ class WriteResultPublisher implements Publisher<Void> {
|
||||
* Invoke this to delegate a completion signal to the subscriber.
|
||||
*/
|
||||
public void publishComplete() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.logPrefix + this.state + " publishComplete");
|
||||
if (rsWriteResultLogger.isTraceEnabled()) {
|
||||
rsWriteResultLogger.trace(this.logPrefix + this.state + " publishComplete");
|
||||
}
|
||||
this.state.get().publishComplete(this);
|
||||
}
|
||||
@@ -80,8 +88,8 @@ class WriteResultPublisher implements Publisher<Void> {
|
||||
* Invoke this to delegate an error signal to the subscriber.
|
||||
*/
|
||||
public void publishError(Throwable t) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.logPrefix + this.state + " publishError: " + t);
|
||||
if (rsWriteResultLogger.isTraceEnabled()) {
|
||||
rsWriteResultLogger.trace(this.logPrefix + this.state + " publishError: " + t);
|
||||
}
|
||||
this.state.get().publishError(this, t);
|
||||
}
|
||||
@@ -105,16 +113,16 @@ class WriteResultPublisher implements Publisher<Void> {
|
||||
|
||||
@Override
|
||||
public final void request(long n) {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.publisher.logPrefix + state() + " request: " + n);
|
||||
if (rsWriteResultLogger.isTraceEnabled()) {
|
||||
rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " request: " + n);
|
||||
}
|
||||
state().request(this.publisher, n);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void cancel() {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace(this.publisher.logPrefix + state() + " cancel");
|
||||
if (rsWriteResultLogger.isTraceEnabled()) {
|
||||
rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " cancel");
|
||||
}
|
||||
state().cancel(this.publisher);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user