diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 3a963712c8..a597a0f142 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -47,7 +47,14 @@ import org.springframework.util.Assert; */ public abstract class AbstractListenerReadPublisher implements Publisher { - protected final Log logger = LogFactory.getLog(getClass()); + /** + * Special logger for tracing Reactive Streams signals. + *

This logger is not exposed under "org.springframework" because it is + * verbose. To enable this, and other related Reactive Streams loggers in + * this package, set "spring-web.reactivestreams" to TRACE. + */ + protected static Log rsReadLogger = LogFactory.getLog("spring-web.reactivestreams.ReadPublisher"); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -106,7 +113,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * container. */ public final void onDataAvailable() { - logger.trace(getLogPrefix() + "onDataAvailable"); + rsReadLogger.trace(getLogPrefix() + "onDataAvailable"); this.state.get().onDataAvailable(this); } @@ -115,7 +122,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * all data has been read. */ public void onAllDataRead() { - logger.trace(getLogPrefix() + "onAllDataRead"); + rsReadLogger.trace(getLogPrefix() + "onAllDataRead"); this.state.get().onAllDataRead(this); } @@ -123,8 +130,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * Sub-classes can call this to delegate container error notifications. */ public final void onError(Throwable ex) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Connection error: " + ex); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Connection error: " + ex); } this.state.get().onError(this, ex); } @@ -174,14 +181,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { } Subscriber subscriber = this.subscriber; Assert.state(subscriber != null, "No subscriber"); - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Publishing data read"); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Publishing data read"); } subscriber.onNext(data); } else { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "No more data to read"); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "No more data to read"); } return true; } @@ -191,8 +198,8 @@ public abstract class AbstractListenerReadPublisher implements Publisher { private boolean changeState(State oldState, State newState) { boolean result = this.state.compareAndSet(oldState, newState); - if (result && logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + oldState + " -> " + newState); + if (result && rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + oldState + " -> " + newState); } return result; } @@ -221,16 +228,16 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override public final void request(long n) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + n + " requested"); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + n + " requested"); } state.get().request(AbstractListenerReadPublisher.this, n); } @Override public final void cancel() { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Cancellation"); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Cancellation"); } state.get().cancel(AbstractListenerReadPublisher.this); } @@ -270,13 +277,13 @@ public abstract class AbstractListenerReadPublisher implements Publisher { // Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND String logPrefix = publisher.getLogPrefix(); if (publisher.completionBeforeDemand) { - publisher.logger.trace(logPrefix + "Completed before demand"); + rsReadLogger.trace(logPrefix + "Completed before demand"); publisher.state.get().onAllDataRead(publisher); } Throwable ex = publisher.errorBeforeDemand; if (ex != null) { - if (publisher.logger.isTraceEnabled()) { - publisher.logger.trace(logPrefix + "Completed with error before demand: " + ex); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex); } publisher.state.get().onError(publisher, ex); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 56bf254f6b..d3f512a9fb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -42,7 +42,15 @@ import org.springframework.util.Assert; */ public abstract class AbstractListenerWriteFlushProcessor implements Processor, Void> { - protected final Log logger = LogFactory.getLog(getClass()); + /** + * Special logger for tracing Reactive Streams signals. + *

This logger is not exposed under "org.springframework" because it is + * verbose. To enable this, and other related Reactive Streams loggers in + * this package, set "spring-web.reactivestreams" to TRACE. + */ + protected static final Log rsWriteFlushLogger = + LogFactory.getLog("spring-web.reactivestreams.WriteFlushProcessor"); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -88,8 +96,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public final void onNext(Publisher publisher) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Received onNext publisher"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "Received onNext publisher"); } this.state.get().onNext(this, publisher); } @@ -100,8 +108,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ @Override public final void onError(Throwable ex) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Received onError: " + ex); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "Received onError: " + ex); } this.state.get().onError(this, ex); } @@ -112,8 +120,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo */ @Override public final void onComplete() { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Received onComplete"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "Received onComplete"); } this.state.get().onComplete(this); } @@ -132,8 +140,8 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo * container to cancel the upstream subscription. */ protected void cancel() { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Received request to cancel"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "Received request to cancel"); } if (this.subscription != null) { this.subscription.cancel(); @@ -190,16 +198,16 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo private boolean changeState(State oldState, State newState) { boolean result = this.state.compareAndSet(oldState, newState); - if (result && logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + oldState + " -> " + newState); + if (result && rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + newState); } return result; } private void flushIfPossible() { boolean result = isWritePossible(); - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "isWritePossible[" + result + "]"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "isWritePossible[" + result + "]"); } if (result) { onFlushPossible(); @@ -400,12 +408,10 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo @Override public void onComplete() { - Log logger = this.processor.logger; - AtomicReference state = this.processor.state; - if (logger.isTraceEnabled()) { - logger.trace(this.processor.getLogPrefix() + state + " writeComplete"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(this.processor.getLogPrefix() + this.processor.state + " writeComplete"); } - state.get().writeComplete(this.processor); + this.processor.state.get().writeComplete(this.processor); } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 020990a263..21f67bc74c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -44,7 +44,14 @@ import org.springframework.util.Assert; */ public abstract class AbstractListenerWriteProcessor implements Processor { - protected final Log logger = LogFactory.getLog(getClass()); + /** + * Special logger for tracing Reactive Streams signals. + *

This logger is not exposed under "org.springframework" because it is + * verbose. To enable this, and other related Reactive Streams loggers in + * this package, set "spring-web.reactivestreams" to TRACE. + */ + protected static final Log rsWriteLogger = LogFactory.getLog("spring-web.reactivestreams.WriteProcessor"); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -93,8 +100,8 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements Processor implements Processor implements Processor implements Processor " + newState); + if (result && rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + oldState + " -> " + newState); } return result; } @@ -247,8 +254,8 @@ public abstract class AbstractListenerWriteProcessor implements Processor apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) { NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc()); try { - ServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory); - ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory); + ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory); + ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory, request); if (request.getMethod() == HttpMethod.HEAD) { response = new HttpHeadResponseDecorator(response); } - String logPrefix = ((ReactorServerHttpRequest) request).getLogPrefix(); - return this.httpHandler.handle(request, response) - .doOnError(ex -> logger.trace(logPrefix + "Failed to complete: " + ex.getMessage())) - .doOnSuccess(aVoid -> logger.trace(logPrefix + "Handling completed")); + .doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())) + .doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed")); } catch (URISyntaxException ex) { if (logger.isDebugEnabled()) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index a0c343da62..fa89e4b23a 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -165,7 +165,13 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { - return this.request.receive().retain().map(this.bufferFactory::wrap); + return this.request.receive().retain() + .doOnNext(buffer -> { + if (logger.isTraceEnabled()) { + logger.trace(getLogPrefix() + "Read " + buffer.readableBytes() + " bytes"); + } + }) + .map(this.bufferFactory::wrap); } @SuppressWarnings("unchecked") diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 591924dce1..24aee74f1d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -45,11 +45,16 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze private final HttpServerResponse response; + private final ReactorServerHttpRequest request; + + + public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory, + ReactorServerHttpRequest request) { - public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) { super(bufferFactory); Assert.notNull(response, "HttpServerResponse must not be null"); this.response = response; + this.request = request; } @@ -70,15 +75,12 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze @Override protected Mono writeWithInternal(Publisher publisher) { - Publisher body = toByteBufs(publisher); - return this.response.send(body).then(); + return this.response.send(toByteBufs(publisher)).then(); } @Override protected Mono writeAndFlushWithInternal(Publisher> publisher) { - Publisher> body = Flux.from(publisher) - .map(ReactorServerHttpResponse::toByteBufs); - return this.response.sendGroups(body).then(); + return this.response.sendGroups(Flux.from(publisher).map(this::toByteBufs)).then(); } @Override @@ -116,8 +118,13 @@ class ReactorServerHttpResponse extends AbstractServerHttpResponse implements Ze return doCommit(() -> this.response.sendFile(file, position, count).then()); } - private static Publisher toByteBufs(Publisher dataBuffers) { - return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf); + private Publisher toByteBufs(Publisher dataBuffers) { + return Flux.from(dataBuffers).map(NettyDataBufferFactory::toByteBuf) + .doOnNext(byteBuf -> { + if (logger.isTraceEnabled()) { + logger.trace("Writing " + byteBuf.readableBytes() + " bytes"); + } + }); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index b386edae36..a439ab4f40 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -33,7 +33,6 @@ import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletRequest; import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; import org.springframework.core.io.buffer.DataBuffer; @@ -60,9 +59,6 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { static final DataBuffer EOF_BUFFER = new DefaultDataBufferFactory().allocateBuffer(0); - protected final Log logger = LogFactory.getLog(getClass()); - - private final HttpServletRequest request; private final RequestBodyPublisher bodyPublisher; @@ -204,7 +200,13 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { DataBuffer readFromInputStream() throws IOException { int read = this.request.getInputStream().read(this.buffer); if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "InputStream.read returned " + read + (read != -1 ? " bytes" : "")); + logger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : "")); + } + else { + Log rsReadLogger = AbstractListenerReadPublisher.rsReadLogger; + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : "")); + } } if (read > 0) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index effcecbf3a..81d9c0c549 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -281,8 +281,8 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { @Override protected void flush() throws IOException { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "flush"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "Flush attempt"); } ServletServerHttpResponse.this.flush(); } @@ -319,29 +319,33 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse { @Override protected boolean write(DataBuffer dataBuffer) throws IOException { if (ServletServerHttpResponse.this.flushOnNext) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "flush"); + if (rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "Flush attempt"); } flush(); } + boolean ready = ServletServerHttpResponse.this.isWritePossible(); - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "write: " + dataBuffer + " ready: " + ready); - } int remaining = dataBuffer.readableByteCount(); if (ready && remaining > 0) { int written = writeToOutputStream(dataBuffer); if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "written: " + written + " total: " + remaining); + logger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes"); + } + else if (rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "Wrote " + written + " of " + remaining + " bytes"); } if (written == remaining) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "releaseData: " + dataBuffer); - } DataBufferUtils.release(dataBuffer); return true; } } + else { + if (rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "ready: " + ready + ", remaining: " + remaining); + } + } + return false; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 72be9f80a8..2ce2f82371 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -31,6 +31,7 @@ import io.undertow.connector.PooledByteBuffer; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; import io.undertow.util.HeaderValues; +import org.apache.commons.logging.Log; import org.xnio.channels.StreamSourceChannel; import reactor.core.publisher.Flux; @@ -66,7 +67,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { super(initUri(exchange), "", initHeaders(exchange)); this.exchange = exchange; - this.body = new RequestBodyPublisher(exchange, bufferFactory, getLogPrefix()); + this.body = new RequestBodyPublisher(exchange, bufferFactory); this.body.registerListeners(exchange); } @@ -134,7 +135,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { } - private static class RequestBodyPublisher extends AbstractListenerReadPublisher { + private class RequestBodyPublisher extends AbstractListenerReadPublisher { private final StreamSourceChannel channel; @@ -142,8 +143,9 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { private final ByteBufferPool byteBufferPool; - public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory, String logPrefix) { - super(logPrefix); + + public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory) { + super(UndertowServerHttpRequest.this.getLogPrefix()); this.channel = exchange.getRequestChannel(); this.bufferFactory = bufferFactory; this.byteBufferPool = exchange.getConnection().getByteBufferPool(); @@ -178,10 +180,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { boolean release = true; try { ByteBuffer byteBuffer = pooledByteBuffer.getBuffer(); - int read = this.channel.read(byteBuffer); + + Log logger = UndertowServerHttpRequest.this.logger; if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Channel.read returned " + read + (read != -1 ? " bytes" : "")); + logger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : "")); + } + else if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Read " + read + (read != -1 ? " bytes" : "")); } if (read > 0) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index b60ba90360..1ad997896c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -173,9 +173,6 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl if (buffer == null) { return false; } - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "write (" + dataBuffer.readableByteCount() + " bytes)"); - } // Track write listener calls from here on.. this.writePossible = false; @@ -184,7 +181,10 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl int written = writeByteBuffer(buffer); if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "written " + written + ", " + total + " remaining"); + logger.trace(getLogPrefix() + "Wrote " + written + " of " + total + " bytes"); + } + else if (rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "Wrote " + written + " of " + total + " bytes"); } if (written != total) { return false; @@ -193,9 +193,6 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl // We wrote all, so can still write more.. this.writePossible = true; - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "releaseData: " + dataBuffer); - } DataBufferUtils.release(dataBuffer); this.byteBuffer = null; return true; @@ -252,8 +249,8 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl protected void flush() throws IOException { StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel; if (channel != null) { - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "flush"); + if (rsWriteFlushLogger.isTraceEnabled()) { + rsWriteFlushLogger.trace(getLogPrefix() + "flush"); } channel.flush(); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java index 37bb8ceebc..384e6b01d6 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java @@ -38,7 +38,15 @@ import org.springframework.util.Assert; */ class WriteResultPublisher implements Publisher { - private static final Log logger = LogFactory.getLog(WriteResultPublisher.class); + /** + * Special logger for tracing Reactive Streams signals. + *

This logger is not exposed under "org.springframework" because it is + * verbose. To enable this, and other related Reactive Streams loggers in + * this package, set "spring-web.reactivestreams" to TRACE. + */ + private static final Log rsWriteResultLogger = + LogFactory.getLog("spring-web.reactivestreams.WriteResultPublisher"); + private final AtomicReference state = new AtomicReference<>(State.UNSUBSCRIBED); @@ -60,8 +68,8 @@ class WriteResultPublisher implements Publisher { @Override public final void subscribe(Subscriber subscriber) { - if (logger.isTraceEnabled()) { - logger.trace(this.logPrefix + this.state + " subscribe: " + subscriber); + if (rsWriteResultLogger.isTraceEnabled()) { + rsWriteResultLogger.trace(this.logPrefix + this.state + " subscribe: " + subscriber); } this.state.get().subscribe(this, subscriber); } @@ -70,8 +78,8 @@ class WriteResultPublisher implements Publisher { * Invoke this to delegate a completion signal to the subscriber. */ public void publishComplete() { - if (logger.isTraceEnabled()) { - logger.trace(this.logPrefix + this.state + " publishComplete"); + if (rsWriteResultLogger.isTraceEnabled()) { + rsWriteResultLogger.trace(this.logPrefix + this.state + " publishComplete"); } this.state.get().publishComplete(this); } @@ -80,8 +88,8 @@ class WriteResultPublisher implements Publisher { * Invoke this to delegate an error signal to the subscriber. */ public void publishError(Throwable t) { - if (logger.isTraceEnabled()) { - logger.trace(this.logPrefix + this.state + " publishError: " + t); + if (rsWriteResultLogger.isTraceEnabled()) { + rsWriteResultLogger.trace(this.logPrefix + this.state + " publishError: " + t); } this.state.get().publishError(this, t); } @@ -105,16 +113,16 @@ class WriteResultPublisher implements Publisher { @Override public final void request(long n) { - if (logger.isTraceEnabled()) { - logger.trace(this.publisher.logPrefix + state() + " request: " + n); + if (rsWriteResultLogger.isTraceEnabled()) { + rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " request: " + n); } state().request(this.publisher, n); } @Override public final void cancel() { - if (logger.isTraceEnabled()) { - logger.trace(this.publisher.logPrefix + state() + " cancel"); + if (rsWriteResultLogger.isTraceEnabled()) { + rsWriteResultLogger.trace(this.publisher.logPrefix + state() + " cancel"); } state().cancel(this.publisher); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 76e28dc917..ee324460fc 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -94,15 +94,10 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc DataBufferFactory bufferFactory, @Nullable MonoProcessor completionMono) { super(delegate, id, info, bufferFactory); - this.receivePublisher = new WebSocketReceivePublisher(initLogPrefix(info, id)); + this.receivePublisher = new WebSocketReceivePublisher(); this.completionMono = completionMono; } - private static String initLogPrefix(HandshakeInfo info, String id) { - return info.getLogPrefix() != null ? info.getLogPrefix() : "[" + id + "] "; - } - - protected WebSocketSendProcessor getSendProcessor() { WebSocketSendProcessor sendProcessor = this.sendProcessor; @@ -229,11 +224,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private volatile Queue pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get(); - WebSocketReceivePublisher(String logPrefix) { - super(logPrefix); - if (logger.isDebugEnabled()) { - logger.debug(getLogPrefix() + "Session id '" + getId() + "' for " + getHandshakeInfo().getUri()); - } + WebSocketReceivePublisher() { + super(AbstractListenerWebSocketSession.this.getLogPrefix()); } @@ -241,8 +233,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc protected void checkOnDataAvailable() { resumeReceiving(); int size = this.pendingMessages.size(); - if (logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "checkOnDataAvailable (" + size + " pending)"); + if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "checkOnDataAvailable (" + size + " pending)"); } if (size > 0) { onDataAvailable(); @@ -264,6 +256,9 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc if (logger.isTraceEnabled()) { logger.trace(getLogPrefix() + "Received " + message); } + else if (rsReadLogger.isTraceEnabled()) { + rsReadLogger.trace(getLogPrefix() + "Received " + message); + } if (!this.pendingMessages.offer(message)) { throw new IllegalStateException( "Too many messages. Please ensure WebSocketSession.receive() is subscribed to."); @@ -291,6 +286,9 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc if (logger.isTraceEnabled()) { logger.trace(getLogPrefix() + "Sending " + message); } + else if (rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "Sending " + message); + } return sendMessage(message); } @@ -310,8 +308,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc * async completion callback into simple flow control. */ public void setReadyToSend(boolean ready) { - if (ready && logger.isTraceEnabled()) { - logger.trace(getLogPrefix() + "Ready to send"); + if (ready && rsWriteLogger.isTraceEnabled()) { + rsWriteLogger.trace(getLogPrefix() + "Ready to send"); } this.isReady = ready; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java index 6d10effe42..a2912dce21 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractWebSocketSession.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -43,6 +45,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; */ public abstract class AbstractWebSocketSession implements WebSocketSession { + protected final Log logger = LogFactory.getLog(getClass()); + private final T delegate; private final String id; @@ -53,23 +57,32 @@ public abstract class AbstractWebSocketSession implements WebSocketSession { private final Map attributes = new ConcurrentHashMap<>(); + private final String logPrefix; + /** * Create a new WebSocket session. */ - protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo handshakeInfo, - DataBufferFactory bufferFactory) { - + protected AbstractWebSocketSession(T delegate, String id, HandshakeInfo info, DataBufferFactory bufferFactory) { Assert.notNull(delegate, "Native session is required."); Assert.notNull(id, "Session id is required."); - Assert.notNull(handshakeInfo, "HandshakeInfo is required."); + Assert.notNull(info, "HandshakeInfo is required."); Assert.notNull(bufferFactory, "DataBuffer factory is required."); this.delegate = delegate; this.id = id; - this.handshakeInfo = handshakeInfo; + this.handshakeInfo = info; this.bufferFactory = bufferFactory; - this.attributes.putAll(handshakeInfo.getAttributes()); + this.attributes.putAll(info.getAttributes()); + this.logPrefix = initLogPrefix(info, id); + + if (logger.isDebugEnabled()) { + logger.debug(getLogPrefix() + "Session id \"" + getId() + "\" for " + getHandshakeInfo().getUri()); + } + } + + private static String initLogPrefix(HandshakeInfo info, String id) { + return info.getLogPrefix() != null ? info.getLogPrefix() : "[" + id + "] "; } @@ -97,6 +110,11 @@ public abstract class AbstractWebSocketSession implements WebSocketSession { return this.attributes; } + protected String getLogPrefix() { + return this.logPrefix; + } + + @Override public abstract Flux receive(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 14b693c4de..45a1443c31 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -56,12 +56,23 @@ public class ReactorNettyWebSocketSession return getDelegate().getInbound() .aggregateFrames(DEFAULT_FRAME_MAX_SIZE) .receiveFrames() - .map(super::toMessage); + .map(super::toMessage) + .doOnNext(message -> { + if (logger.isTraceEnabled()) { + logger.trace(getLogPrefix() + "Received " + message); + } + }); } @Override public Mono send(Publisher messages) { - Flux frames = Flux.from(messages).map(this::toFrame); + Flux frames = Flux.from(messages) + .doOnNext(message -> { + if (logger.isTraceEnabled()) { + logger.trace(getLogPrefix() + "Sending " + message); + } + }) + .map(this::toFrame); return getDelegate().getOutbound() .options(NettyPipeline.SendOptions::flushOnEach) .sendObject(frames) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index 4bfcdd6d63..31125f7149 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -64,17 +64,11 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests Flux input = Flux.range(1, count).map(index -> "msg-" + index); ReplayProcessor output = ReplayProcessor.create(count); - this.client.execute(getUrl("/echo"), - session -> { - logger.debug("Starting to send messages"); - return session - .send(input.doOnNext(s -> logger.debug("outbound " + s)).map(session::textMessage)) - .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) - .subscribeWith(output) - .doOnNext(s -> logger.debug("inbound " + s)) - .then(); - }) - .doOnSuccessOrError((aVoid, ex) -> logger.debug("Done: " + (ex != null ? ex.getMessage() : "success"))) + this.client.execute(getUrl("/echo"), session -> session + .send(input.map(session::textMessage)) + .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) + .subscribeWith(output) + .then()) .block(TIMEOUT); assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT)); @@ -181,7 +175,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @Override public Mono handle(WebSocketSession session) { String protocol = session.getHandshakeInfo().getSubProtocol(); - WebSocketMessage message = session.textMessage(protocol); + WebSocketMessage message = session.textMessage(protocol != null ? protocol : "none"); return session.send(Mono.just(message)); } } diff --git a/spring-webflux/src/test/resources/log4j2-test.xml b/spring-webflux/src/test/resources/log4j2-test.xml index 2cdd748b9c..a327721075 100644 --- a/spring-webflux/src/test/resources/log4j2-test.xml +++ b/spring-webflux/src/test/resources/log4j2-test.xml @@ -9,11 +9,6 @@ - - - - -