Correlated messages at HTTP adapter + WebSocket level

Issue: SPR-16966
This commit is contained in:
Rossen Stoyanchev
2018-07-06 12:55:32 -04:00
parent 7be2943c03
commit 5dc49b16ea
27 changed files with 393 additions and 163 deletions

View File

@@ -65,6 +65,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Nullable
private volatile Throwable errorBeforeDemand;
private final String logPrefix;
public AbstractListenerReadPublisher() {
this("");
}
/**
* Create an instance with the given log prefix.
* @since 5.1
*/
public AbstractListenerReadPublisher(String logPrefix) {
this.logPrefix = logPrefix;
}
/**
* Return the configured log message prefix.
* @since 5.1
*/
public String getLogPrefix() {
return this.logPrefix;
}
// Publisher implementation...
@@ -82,7 +106,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
* container.
*/
public final void onDataAvailable() {
logger.trace("I/O event onDataAvailable");
logger.trace(getLogPrefix() + "onDataAvailable");
this.state.get().onDataAvailable(this);
}
@@ -91,7 +115,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
* all data has been read.
*/
public void onAllDataRead() {
logger.trace("I/O event onAllDataRead");
logger.trace(getLogPrefix() + "onAllDataRead");
this.state.get().onAllDataRead(this);
}
@@ -100,7 +124,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
*/
public final void onError(Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("I/O event onError: " + ex);
logger.trace(getLogPrefix() + "Connection error: " + ex);
}
this.state.get().onError(this, ex);
}
@@ -151,13 +175,13 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
Subscriber<? super T> subscriber = this.subscriber;
Assert.state(subscriber != null, "No subscriber");
if (logger.isTraceEnabled()) {
logger.trace("Data item read, publishing..");
logger.trace(getLogPrefix() + "Publishing data read");
}
subscriber.onNext(data);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("No more data to read");
logger.trace(getLogPrefix() + "No more data to read");
}
return true;
}
@@ -168,7 +192,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
private boolean changeState(State oldState, State newState) {
boolean result = this.state.compareAndSet(oldState, newState);
if (result && logger.isTraceEnabled()) {
logger.trace(oldState + " -> " + newState);
logger.trace(getLogPrefix() + oldState + " -> " + newState);
}
return result;
}
@@ -198,7 +222,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
public final void request(long n) {
if (logger.isTraceEnabled()) {
logger.trace("Signal request(" + n + ")");
logger.trace(getLogPrefix() + n + " requested");
}
state.get().request(AbstractListenerReadPublisher.this, n);
}
@@ -206,7 +230,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
public final void cancel() {
if (logger.isTraceEnabled()) {
logger.trace("Signal cancel()");
logger.trace(getLogPrefix() + "Cancellation");
}
state.get().cancel(AbstractListenerReadPublisher.this);
}
@@ -244,14 +268,15 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
String logPrefix = publisher.getLogPrefix();
if (publisher.completionBeforeDemand) {
publisher.logger.trace("Completed before demand");
publisher.logger.trace(logPrefix + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (publisher.logger.isTraceEnabled()) {
publisher.logger.trace("Completed with error before demand: " + ex);
publisher.logger.trace(logPrefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}

View File

@@ -51,7 +51,32 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
private volatile boolean subscriberCompleted;
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final WriteResultPublisher resultPublisher;
private final String logPrefix;
public AbstractListenerWriteFlushProcessor() {
this("");
}
/**
* Create an instance with the given log prefix.
* @since 5.1
*/
public AbstractListenerWriteFlushProcessor(String logPrefix) {
this.logPrefix = logPrefix;
this.resultPublisher = new WriteResultPublisher(logPrefix);
}
/**
* Create an instance with the given log prefix.
* @since 5.1
*/
public String getLogPrefix() {
return this.logPrefix;
}
// Subscriber methods and async I/O notification methods...
@@ -63,7 +88,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public final void onNext(Publisher<? extends T> publisher) {
logger.trace("Received onNext publisher");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Received onNext publisher");
}
this.state.get().onNext(this, publisher);
}
@@ -74,7 +101,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public final void onError(Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("Received onError: " + ex);
logger.trace(getLogPrefix() + "Received onError: " + ex);
}
this.state.get().onError(this, ex);
}
@@ -85,7 +112,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
*/
@Override
public final void onComplete() {
logger.trace("Received onComplete");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Received onComplete");
}
this.state.get().onComplete(this);
}
@@ -103,7 +132,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
* container to cancel the upstream subscription.
*/
protected void cancel() {
logger.trace("Received request to cancel");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Received request to cancel");
}
if (this.subscription != null) {
this.subscription.cancel();
}
@@ -160,7 +191,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
private boolean changeState(State oldState, State newState) {
boolean result = this.state.compareAndSet(oldState, newState);
if (result && logger.isTraceEnabled()) {
logger.trace(oldState + " -> " + newState);
logger.trace(getLogPrefix() + oldState + " -> " + newState);
}
return result;
}
@@ -168,7 +199,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
private void flushIfPossible() {
boolean result = isWritePossible();
if (logger.isTraceEnabled()) {
logger.trace("isWritePossible[" + result + "]");
logger.trace(getLogPrefix() + "isWritePossible[" + result + "]");
}
if (result) {
onFlushPossible();
@@ -369,10 +400,12 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public void onComplete() {
if (this.processor.logger.isTraceEnabled()) {
this.processor.logger.trace(this.processor.state + " writeComplete");
Log logger = this.processor.logger;
AtomicReference<State> state = this.processor.state;
if (logger.isTraceEnabled()) {
logger.trace(this.processor.getLogPrefix() + state + " writeComplete");
}
this.processor.state.get().writeComplete(this.processor);
state.get().writeComplete(this.processor);
}
}
}

View File

@@ -56,7 +56,32 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private volatile boolean subscriberCompleted;
private final WriteResultPublisher resultPublisher = new WriteResultPublisher();
private final WriteResultPublisher resultPublisher;
private final String logPrefix;
public AbstractListenerWriteProcessor() {
this("");
}
/**
* Create an instance with the given log prefix.
* @since 5.1
*/
public AbstractListenerWriteProcessor(String logPrefix) {
this.logPrefix = logPrefix;
this.resultPublisher = new WriteResultPublisher(logPrefix);
}
/**
* Create an instance with the given log prefix.
* @since 5.1
*/
public String getLogPrefix() {
return this.logPrefix;
}
// Subscriber methods and async I/O notification methods...
@@ -68,7 +93,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public final void onNext(T data) {
logger.trace("Received onNext data item");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Item to write");
}
this.state.get().onNext(this, data);
}
@@ -79,7 +106,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public final void onError(Throwable ex) {
if (logger.isTraceEnabled()) {
logger.trace("Received onError: " + ex);
logger.trace(getLogPrefix() + "Write source error: " + ex);
}
this.state.get().onError(this, ex);
}
@@ -90,7 +117,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
*/
@Override
public final void onComplete() {
logger.trace("Received onComplete");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "No more items to write");
}
this.state.get().onComplete(this);
}
@@ -100,7 +129,9 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* container.
*/
public final void onWritePossible() {
logger.trace("Received onWritePossible");
if (logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "onWritePossible");
}
this.state.get().onWritePossible(this);
}
@@ -109,7 +140,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* container to cancel the upstream subscription.
*/
public void cancel() {
logger.trace("Received request to cancel");
logger.trace(getLogPrefix() + "Cancellation");
if (this.subscription != null) {
this.subscription.cancel();
}
@@ -193,7 +224,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private boolean changeState(State oldState, State newState) {
boolean result = this.state.compareAndSet(oldState, newState);
if (result && logger.isTraceEnabled()) {
logger.trace(oldState + " -> " + newState);
logger.trace(getLogPrefix() + oldState + " -> " + newState);
}
return result;
}
@@ -216,8 +247,8 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private void writeIfPossible() {
boolean result = isWritePossible();
if (logger.isTraceEnabled()) {
logger.trace("isWritePossible[" + result + "]");
if (!result && logger.isTraceEnabled()) {
logger.trace(getLogPrefix() + "Writing not possible");
}
if (result) {
onWritePossible();

View File

@@ -32,6 +32,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
@@ -62,6 +63,12 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
@Nullable
private SslInfo sslInfo;
@Nullable
private String connectionId;
@Nullable
private String logPrefix;
/**
* Constructor with the URI and headers for the request.
@@ -129,7 +136,7 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
}
catch (UnsupportedEncodingException ex) {
if (logger.isWarnEnabled()) {
logger.warn("Could not decode query param [" + value + "] as 'UTF-8'. " +
logger.warn(getLogPrefix() + "Could not decode query param [" + value + "] as 'UTF-8'. " +
"Falling back on default encoding; exception message: " + ex.getMessage());
}
return URLDecoder.decode(value);
@@ -180,12 +187,38 @@ public abstract class AbstractServerHttpRequest implements ServerHttpRequest {
public abstract <T> T getNativeRequest();
/**
* Return an id for the underlying connection, if available.
* Return an id representing the underlying connection, if available, or
* otherwise the identify of the request object.
* @since 5.1
*/
public String getConnectionId() {
if (this.connectionId == null) {
this.connectionId = initConnectionId();
if (this.connectionId == null) {
this.connectionId = ObjectUtils.getIdentityHexString(this);
}
}
return this.connectionId;
}
/**
* Obtain the connection id, if available.
* @since 5.1
*/
@Nullable
public String getConnectionId() {
protected String initConnectionId() {
return null;
}
/**
* For internal use in logging at the HTTP adapter layer.
* @since 5.1
*/
String getLogPrefix() {
if (this.logPrefix == null) {
this.logPrefix = "[" + getConnectionId() + "] ";
}
return this.logPrefix;
}
}

View File

@@ -22,8 +22,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -58,8 +56,6 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
private enum State {NEW, COMMITTING, COMMITTED}
private final Log logger = LogFactory.getLog(getClass());
private final DataBufferFactory dataBufferFactory;
@Nullable
@@ -90,9 +86,6 @@ public abstract class AbstractServerHttpResponse implements ServerHttpResponse {
@Override
public boolean setStatusCode(@Nullable HttpStatus status) {
if (this.state.get() == State.COMMITTED) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring status " + status + ": response already committed");
}
return false;
}
else {

View File

@@ -32,6 +32,7 @@ import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
@@ -191,7 +192,6 @@ class DefaultServerHttpRequestBuilder implements ServerHttpRequest.Builder {
private final ServerHttpRequest originalRequest;
@Nullable
private final String requestId;
@@ -207,7 +207,8 @@ class DefaultServerHttpRequestBuilder implements ServerHttpRequest.Builder {
this.body = body;
this.originalRequest = originalRequest;
this.requestId = originalRequest instanceof AbstractServerHttpRequest ?
((AbstractServerHttpRequest) originalRequest).getConnectionId() : null;
((AbstractServerHttpRequest) originalRequest).getConnectionId() :
ObjectUtils.getIdentityHexString(originalRequest);
}
@Override

View File

@@ -43,19 +43,21 @@ public class JettyHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected ServerHttpResponse createResponse(HttpServletResponse response,
AsyncContext context) throws IOException {
protected ServletServerHttpResponse createResponse(HttpServletResponse response,
AsyncContext context, ServletServerHttpRequest request) throws IOException {
return new JettyServerHttpResponse(response, context, getDataBufferFactory(), getBufferSize());
return new JettyServerHttpResponse(
response, context, getDataBufferFactory(), getBufferSize(), request);
}
private static final class JettyServerHttpResponse extends ServletServerHttpResponse {
public JettyServerHttpResponse(HttpServletResponse response, AsyncContext context,
DataBufferFactory factory, int bufferSize) throws IOException {
public JettyServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request)
throws IOException {
super(response, context, factory, bufferSize);
super(response, asyncContext, bufferFactory, bufferSize, request);
}
@Override

View File

@@ -68,13 +68,15 @@ public class ReactorHttpHandlerAdapter implements BiFunction<HttpServerRequest,
return Mono.empty();
}
String logPrefix = ((ReactorServerHttpRequest) adaptedRequest).getLogPrefix();
if (adaptedRequest.getMethod() == HttpMethod.HEAD) {
adaptedResponse = new HttpHeadResponseDecorator(adaptedResponse);
}
return this.httpHandler.handle(adaptedRequest, adaptedResponse)
.doOnError(ex -> logger.trace("Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace("Handling completed"));
.doOnError(ex -> logger.trace(logPrefix + "Failed to complete: " + ex.getMessage()))
.doOnSuccess(aVoid -> logger.trace(logPrefix + "Handling completed"));
}
}

View File

@@ -175,7 +175,8 @@ class ReactorServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public String getConnectionId() {
@Nullable
protected String initConnectionId() {
return this.request instanceof Connection ?
((Connection) this.request).channel().id().asShortText() : null;
}

View File

@@ -166,7 +166,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(-1);
ServerHttpRequest httpRequest;
ServletServerHttpRequest httpRequest;
try {
httpRequest = createRequest(((HttpServletRequest) request), asyncContext);
}
@@ -178,21 +178,23 @@ public class ServletHttpHandlerAdapter implements Servlet {
asyncContext.complete();
return;
}
ServerHttpResponse httpResponse = createResponse(((HttpServletResponse) response), asyncContext);
ServerHttpResponse httpResponse =
createResponse(((HttpServletResponse) response), asyncContext, httpRequest);
if (httpRequest.getMethod() == HttpMethod.HEAD) {
httpResponse = new HttpHeadResponseDecorator(httpResponse);
}
AtomicBoolean isCompleted = new AtomicBoolean();
HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted);
HandlerResultAsyncListener listener = new HandlerResultAsyncListener(isCompleted, httpRequest);
asyncContext.addListener(listener);
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted);
HandlerResultSubscriber subscriber = new HandlerResultSubscriber(asyncContext, isCompleted, httpRequest);
this.httpHandler.handle(httpRequest, httpResponse).subscribe(subscriber);
}
protected ServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context)
protected ServletServerHttpRequest createRequest(HttpServletRequest request, AsyncContext context)
throws IOException, URISyntaxException {
Assert.notNull(this.servletPath, "Servlet path is not initialized");
@@ -200,8 +202,10 @@ public class ServletHttpHandlerAdapter implements Servlet {
request, context, this.servletPath, getDataBufferFactory(), getBufferSize());
}
protected ServerHttpResponse createResponse(HttpServletResponse response, AsyncContext context) throws IOException {
return new ServletServerHttpResponse(response, context, getDataBufferFactory(), getBufferSize());
protected ServletServerHttpResponse createResponse(HttpServletResponse response,
AsyncContext context, ServletServerHttpRequest request) throws IOException {
return new ServletServerHttpResponse(response, context, getDataBufferFactory(), getBufferSize(), request);
}
@Override
@@ -241,13 +245,17 @@ public class ServletHttpHandlerAdapter implements Servlet {
private final AtomicBoolean isCompleted;
public HandlerResultAsyncListener(AtomicBoolean isCompleted) {
private final String logPrefix;
public HandlerResultAsyncListener(AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) {
this.isCompleted = isCompleted;
this.logPrefix = httpRequest.getLogPrefix();
}
@Override
public void onTimeout(AsyncEvent event) {
logger.debug("Timeout notification");
logger.debug(this.logPrefix + "Timeout notification");
AsyncContext context = event.getAsyncContext();
runIfAsyncNotComplete(context, this.isCompleted, context::complete);
}
@@ -255,7 +263,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
@Override
public void onError(AsyncEvent event) {
Throwable ex = event.getThrowable();
logger.debug("Error notification: " + (ex != null ? ex : "<no Throwable>"));
logger.debug(this.logPrefix + "Error notification: " + (ex != null ? ex : "<no Throwable>"));
AsyncContext context = event.getAsyncContext();
runIfAsyncNotComplete(context, this.isCompleted, context::complete);
}
@@ -278,9 +286,15 @@ public class ServletHttpHandlerAdapter implements Servlet {
private final AtomicBoolean isCompleted;
public HandlerResultSubscriber(AsyncContext asyncContext, AtomicBoolean isCompleted) {
private final String logPrefix;
public HandlerResultSubscriber(
AsyncContext asyncContext, AtomicBoolean isCompleted, ServletServerHttpRequest httpRequest) {
this.asyncContext = asyncContext;
this.isCompleted = isCompleted;
this.logPrefix = httpRequest.getLogPrefix();
}
@Override
@@ -295,16 +309,16 @@ public class ServletHttpHandlerAdapter implements Servlet {
@Override
public void onError(Throwable ex) {
logger.trace("Failed to complete: " + ex.getMessage());
logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage());
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, () -> {
if (this.asyncContext.getResponse().isCommitted()) {
logger.trace("Dispatch to container, to raise the error on servlet thread");
logger.trace(this.logPrefix + "Dispatch to container, to raise the error on servlet thread");
this.asyncContext.getRequest().setAttribute(WRITE_ERROR_ATTRIBUTE_NAME, ex);
this.asyncContext.dispatch();
}
else {
try {
logger.trace("Setting ServletResponse status to 500 Server Error");
logger.trace(this.logPrefix + "Setting ServletResponse status to 500 Server Error");
this.asyncContext.getResponse().resetBuffer();
((HttpServletResponse) this.asyncContext.getResponse()).setStatus(500);
}
@@ -317,7 +331,7 @@ public class ServletHttpHandlerAdapter implements Servlet {
@Override
public void onComplete() {
logger.trace("Handling completed");
logger.trace(this.logPrefix + "Handling completed");
runIfAsyncNotComplete(this.asyncContext, this.isCompleted, this.asyncContext::complete);
}
}

View File

@@ -204,7 +204,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
DataBuffer readFromInputStream() throws IOException {
int read = this.request.getInputStream().read(this.buffer);
if (logger.isTraceEnabled()) {
logger.trace("InputStream.read returned " + read + (read != -1 ? " bytes" : ""));
logger.trace(getLogPrefix() + "InputStream.read returned " + read + (read != -1 ? " bytes" : ""));
}
if (read > 0) {
@@ -256,6 +256,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
private final ServletInputStream inputStream;
public RequestBodyPublisher(ServletInputStream inputStream) {
super(ServletServerHttpRequest.this.getLogPrefix());
this.inputStream = inputStream;
}

View File

@@ -60,9 +60,11 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private volatile boolean flushOnNext;
private final ServletServerHttpRequest request;
public ServletServerHttpResponse(HttpServletResponse response, AsyncContext asyncContext,
DataBufferFactory bufferFactory, int bufferSize) throws IOException {
DataBufferFactory bufferFactory, int bufferSize, ServletServerHttpRequest request) throws IOException {
super(bufferFactory);
@@ -73,6 +75,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
this.response = response;
this.outputStream = response.getOutputStream();
this.bufferSize = bufferSize;
this.request = request;
asyncContext.addListener(new ResponseAsyncListener());
@@ -265,6 +268,10 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor<DataBuffer> {
public ResponseBodyFlushProcessor() {
super(request.getLogPrefix());
}
@Override
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
ResponseBodyProcessor processor = new ResponseBodyProcessor();
@@ -275,7 +282,7 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
@Override
protected void flush() throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("flush");
logger.trace(getLogPrefix() + "flush");
}
ServletServerHttpResponse.this.flush();
}
@@ -294,6 +301,11 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
private class ResponseBodyProcessor extends AbstractListenerWriteProcessor<DataBuffer> {
public ResponseBodyProcessor() {
super(request.getLogPrefix());
}
@Override
protected boolean isWritePossible() {
return ServletServerHttpResponse.this.isWritePossible();
@@ -308,23 +320,23 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
protected boolean write(DataBuffer dataBuffer) throws IOException {
if (ServletServerHttpResponse.this.flushOnNext) {
if (logger.isTraceEnabled()) {
logger.trace("flush");
logger.trace(getLogPrefix() + "flush");
}
flush();
}
boolean ready = ServletServerHttpResponse.this.isWritePossible();
if (logger.isTraceEnabled()) {
logger.trace("write: " + dataBuffer + " ready: " + ready);
logger.trace(getLogPrefix() + "write: " + dataBuffer + " ready: " + ready);
}
int remaining = dataBuffer.readableByteCount();
if (ready && remaining > 0) {
int written = writeToOutputStream(dataBuffer);
if (logger.isTraceEnabled()) {
logger.trace("written: " + written + " total: " + remaining);
logger.trace(getLogPrefix() + "written: " + written + " total: " + remaining);
}
if (written == remaining) {
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + dataBuffer);
logger.trace(getLogPrefix() + "releaseData: " + dataBuffer);
}
DataBufferUtils.release(dataBuffer);
return true;

View File

@@ -50,19 +50,20 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
@Override
protected ServerHttpRequest createRequest(HttpServletRequest request, AsyncContext asyncContext)
protected ServletServerHttpRequest createRequest(HttpServletRequest request, AsyncContext asyncContext)
throws IOException, URISyntaxException {
Assert.notNull(getServletPath(), "servletPath is not initialized.");
return new TomcatServerHttpRequest(request, asyncContext, getServletPath(),
getDataBufferFactory(), getBufferSize());
return new TomcatServerHttpRequest(
request, asyncContext, getServletPath(), getDataBufferFactory(), getBufferSize());
}
@Override
protected ServerHttpResponse createResponse(HttpServletResponse response, AsyncContext cxt)
throws IOException {
protected ServletServerHttpResponse createResponse(HttpServletResponse response,
AsyncContext asyncContext, ServletServerHttpRequest request) throws IOException {
return new TomcatServerHttpResponse(response, cxt, getDataBufferFactory(), getBufferSize());
return new TomcatServerHttpResponse(
response, asyncContext, getDataBufferFactory(), getBufferSize(), request);
}
@@ -86,7 +87,7 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
ServletRequest request = getNativeRequest();
int read = ((CoyoteInputStream) request.getInputStream()).read(byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("read:" + read);
logger.trace(getLogPrefix() + "read:" + read);
}
if (read > 0) {
@@ -113,9 +114,9 @@ public class TomcatHttpHandlerAdapter extends ServletHttpHandlerAdapter {
private static final class TomcatServerHttpResponse extends ServletServerHttpResponse {
public TomcatServerHttpResponse(HttpServletResponse response, AsyncContext context,
DataBufferFactory factory, int bufferSize) throws IOException {
DataBufferFactory factory, int bufferSize, ServletServerHttpRequest request) throws IOException {
super(response, context, factory, bufferSize);
super(response, context, factory, bufferSize, request);
}
@Override

View File

@@ -66,7 +66,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void handleRequest(HttpServerExchange exchange) {
ServerHttpRequest request = null;
UndertowServerHttpRequest request = null;
try {
request = new UndertowServerHttpRequest(exchange, getDataBufferFactory());
}
@@ -77,13 +77,13 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
exchange.setStatusCode(400);
return;
}
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory());
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, getDataBufferFactory(), request);
if (request.getMethod() == HttpMethod.HEAD) {
response = new HttpHeadResponseDecorator(response);
}
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange);
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(exchange, request);
this.httpHandler.handle(request, response).subscribe(resultSubscriber);
}
@@ -92,8 +92,12 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
private final HttpServerExchange exchange;
public HandlerResultSubscriber(HttpServerExchange exchange) {
private final String logPrefix;
public HandlerResultSubscriber(HttpServerExchange exchange, UndertowServerHttpRequest request) {
this.exchange = exchange;
this.logPrefix = request.getLogPrefix();
}
@Override
@@ -108,10 +112,10 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onError(Throwable ex) {
logger.trace("Failed to complete: " + ex.getMessage());
logger.trace(this.logPrefix + "Failed to complete: " + ex.getMessage());
if (this.exchange.isResponseStarted()) {
try {
logger.debug("Closing connection");
logger.debug(this.logPrefix + "Closing connection");
this.exchange.getConnection().close();
}
catch (IOException ex2) {
@@ -119,7 +123,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
}
}
else {
logger.debug("Setting HttpServerExchange status to 500 Server Error");
logger.debug(this.logPrefix + "Setting HttpServerExchange status to 500 Server Error");
this.exchange.setStatusCode(500);
this.exchange.endExchange();
}
@@ -127,7 +131,7 @@ public class UndertowHttpHandlerAdapter implements io.undertow.server.HttpHandle
@Override
public void onComplete() {
logger.trace("Handling completed");
logger.trace(this.logPrefix + "Handling completed");
this.exchange.endExchange();
}
}

View File

@@ -60,17 +60,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
private final RequestBodyPublisher body;
private final String connectionId;
public UndertowServerHttpRequest(HttpServerExchange exchange, DataBufferFactory bufferFactory)
throws URISyntaxException {
super(initUri(exchange), "", initHeaders(exchange));
this.exchange = exchange;
this.body = new RequestBodyPublisher(exchange, bufferFactory);
this.body = new RequestBodyPublisher(exchange, bufferFactory, getLogPrefix());
this.body.registerListeners(exchange);
this.connectionId = ObjectUtils.getIdentityHexString(this.exchange.getConnection());
}
private static URI initUri(HttpServerExchange exchange) throws URISyntaxException {
@@ -132,8 +129,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
public String getConnectionId() {
return this.connectionId;
protected String initConnectionId() {
return ObjectUtils.getIdentityHexString(this.exchange.getConnection());
}
@@ -145,7 +142,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
private final ByteBufferPool byteBufferPool;
public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory bufferFactory, String logPrefix) {
super(logPrefix);
this.channel = exchange.getRequestChannel();
this.bufferFactory = bufferFactory;
this.byteBufferPool = exchange.getConnection().getByteBufferPool();
@@ -183,7 +181,7 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
int read = this.channel.read(byteBuffer);
if (logger.isTraceEnabled()) {
logger.trace("Channel.read returned " + read + (read != -1 ? " bytes" : ""));
logger.trace(getLogPrefix() + "Channel.read returned " + read + (read != -1 ? " bytes" : ""));
}
if (read > 0) {

View File

@@ -52,14 +52,19 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
private final HttpServerExchange exchange;
private final UndertowServerHttpRequest request;
@Nullable
private StreamSinkChannel responseChannel;
public UndertowServerHttpResponse(HttpServerExchange exchange, DataBufferFactory bufferFactory) {
public UndertowServerHttpResponse(
HttpServerExchange exchange, DataBufferFactory bufferFactory, UndertowServerHttpRequest request) {
super(bufferFactory);
Assert.notNull(exchange, "HttpServerExchange must not be null");
this.exchange = exchange;
this.request = request;
}
@@ -146,6 +151,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
public ResponseBodyProcessor(StreamSinkChannel channel) {
super(request.getLogPrefix());
Assert.notNull(channel, "StreamSinkChannel must not be null");
this.channel = channel;
this.channel.getWriteSetter().set(c -> {
@@ -168,7 +174,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("write: " + dataBuffer);
logger.trace(getLogPrefix() + "write (" + dataBuffer.readableByteCount() + " bytes)");
}
// Track write listener calls from here on..
@@ -178,7 +184,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
int written = writeByteBuffer(buffer);
if (logger.isTraceEnabled()) {
logger.trace("written: " + written + " total: " + total);
logger.trace(getLogPrefix() + "written " + written + ", " + total + " remaining");
}
if (written != total) {
return false;
@@ -188,7 +194,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
this.writePossible = true;
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + dataBuffer);
logger.trace(getLogPrefix() + "releaseData: " + dataBuffer);
}
DataBufferUtils.release(dataBuffer);
this.byteBuffer = null;
@@ -233,6 +239,10 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
private class ResponseBodyFlushProcessor extends AbstractListenerWriteFlushProcessor<DataBuffer> {
public ResponseBodyFlushProcessor() {
super(request.getLogPrefix());
}
@Override
protected Processor<? super DataBuffer, Void> createWriteProcessor() {
return UndertowServerHttpResponse.this.createBodyProcessor();
@@ -243,7 +253,7 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
StreamSinkChannel channel = UndertowServerHttpResponse.this.responseChannel;
if (channel != null) {
if (logger.isTraceEnabled()) {
logger.trace("flush");
logger.trace(getLogPrefix() + "flush");
}
channel.flush();
}

View File

@@ -50,11 +50,18 @@ class WriteResultPublisher implements Publisher<Void> {
@Nullable
private volatile Throwable errorBeforeSubscribed;
private final String logPrefix;
public WriteResultPublisher(String logPrefix) {
this.logPrefix = logPrefix;
}
@Override
public final void subscribe(Subscriber<? super Void> subscriber) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " subscribe: " + subscriber);
logger.trace(this.logPrefix + this.state + " subscribe: " + subscriber);
}
this.state.get().subscribe(this, subscriber);
}
@@ -64,7 +71,7 @@ class WriteResultPublisher implements Publisher<Void> {
*/
public void publishComplete() {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " publishComplete");
logger.trace(this.logPrefix + this.state + " publishComplete");
}
this.state.get().publishComplete(this);
}
@@ -74,7 +81,7 @@ class WriteResultPublisher implements Publisher<Void> {
*/
public void publishError(Throwable t) {
if (logger.isTraceEnabled()) {
logger.trace(this.state + " publishError: " + t);
logger.trace(this.logPrefix + this.state + " publishError: " + t);
}
this.state.get().publishError(this, t);
}
@@ -99,7 +106,7 @@ class WriteResultPublisher implements Publisher<Void> {
@Override
public final void request(long n) {
if (logger.isTraceEnabled()) {
logger.trace(state() + " request: " + n);
logger.trace(this.publisher.logPrefix + state() + " request: " + n);
}
state().request(this.publisher, n);
}
@@ -107,7 +114,7 @@ class WriteResultPublisher implements Publisher<Void> {
@Override
public final void cancel() {
if (logger.isTraceEnabled()) {
logger.trace(state() + " cancel");
logger.trace(this.publisher.logPrefix + state() + " cancel");
}
state().cancel(this.publisher);
}

View File

@@ -231,14 +231,8 @@ public class HttpWebHandlerAdapter extends WebHandlerDecorator implements HttpHa
}
private String initLogId(ServerHttpRequest request) {
String logId = null;
if (request instanceof AbstractServerHttpRequest) {
logId = ((AbstractServerHttpRequest) request).getConnectionId();
}
if (logId == null) {
logId = ObjectUtils.getIdentityHexString(request);
}
return logId;
return request instanceof AbstractServerHttpRequest ?
((AbstractServerHttpRequest) request).getConnectionId() : ObjectUtils.getIdentityHexString(request);
}
private void logExchange(ServerWebExchange exchange) {