diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java index 3b354022e..28a7c3611 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.commons.logging.LogFactory; import org.springframework.aop.support.AopUtils; import org.springframework.beans.factory.BeanFactory; -import org.springframework.cloud.sleuth.Log; import org.springframework.cloud.sleuth.Span; import org.springframework.cloud.sleuth.sampler.NeverSampler; import org.springframework.integration.channel.DirectChannel; @@ -54,6 +53,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { @Override public void afterSendCompletion(Message message, MessageChannel channel, boolean sent, Exception ex) { + if (emptyMessage(message)) { + return; + } if (isDirectChannel(channel)) { afterMessageHandled(message, channel, null, ex); } @@ -81,20 +83,11 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } } - private boolean containsServerReceived(Span span) { - if (span == null) { - return false; - } - for (Log log : span.logs()) { - if (Span.SERVER_RECV.equals(log.getEvent())) { - return true; - } - } - return false; - } - @Override public Message preSend(Message message, MessageChannel channel) { + if (emptyMessage(message)) { + return message; + } if (log.isDebugEnabled()) { log.debug("Processing message before sending it to the channel"); } @@ -142,6 +135,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { private Message outputMessage(Message message, MessageBuilder messageBuilder, MessageHeaderAccessor headers) { + if (emptyMessage(message)) { + return message; + } if (message instanceof ErrorMessage) { headers.copyHeaders(sleuthHeaders(messageBuilder.build().getHeaders())); return new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()); @@ -183,6 +179,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { @Override public Message beforeHandle(Message message, MessageChannel channel, MessageHandler handler) { + if (emptyMessage(message)) { + return message; + } Message retrievedMessage = getMessage(message); MessageBuilder messageBuilder = MessageBuilder.fromMessage(retrievedMessage); Span spanFromHeader = getTracer().isTracing() ? getTracer().getCurrentSpan() @@ -206,6 +205,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { @Override public void afterMessageHandled(Message message, MessageChannel channel, MessageHandler handler, Exception ex) { + if (emptyMessage(message)) { + return; + } Span spanFromHeader = getTracer().getCurrentSpan(); if (log.isDebugEnabled()) { log.debug("Continuing span " + spanFromHeader + " after message handled"); @@ -226,27 +228,13 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } } - @Override public void postSend(Message message, MessageChannel channel, - boolean sent) { - super.postSend(message, channel, sent); - } - - @Override public boolean preReceive(MessageChannel channel) { - return super.preReceive(channel); - } - - @Override public Message postReceive(Message message, MessageChannel channel) { - return super.postReceive(message, channel); - } - - @Override public void afterReceiveCompletion(Message message, - MessageChannel channel, Exception ex) { - super.afterReceiveCompletion(message, channel, ex); - } - private void addErrorTag(Exception ex) { if (ex != null) { getErrorParser().parseErrorTags(getTracer().getCurrentSpan(), ex); } } + + private boolean emptyMessage(Message message) { + return message == null || message.getPayload() == null; + } } diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptorTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptorTests.java index f9db4af6d..97b5b7170 100644 --- a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptorTests.java +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptorTests.java @@ -69,6 +69,7 @@ import org.springframework.util.SerializationUtils; import static org.assertj.core.api.BDDAssertions.then; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.springframework.cloud.sleuth.assertions.SleuthAssertions.then; /** @@ -109,6 +110,9 @@ public class TraceChannelInterceptorTests implements MessageHandler { @Autowired private ArrayListSpanAccumulator accumulator; + @Autowired + private TraceChannelInterceptor interceptor; + private Message message; private Span span; @@ -140,6 +144,21 @@ public class TraceChannelInterceptorTests implements MessageHandler { this.accumulator.getSpans().clear(); } + @Test + public void shouldWorkForEmptyQueue() { + this.interceptor.beforeHandle(null, null, null); + this.interceptor.afterMessageHandled(null, null, null, null); + this.interceptor.afterSendCompletion(null, null, true, null); + this.interceptor.preSend(null, null); + + thenNoSpanReported(); + } + + private void thenNoSpanReported() { + then(TestSpanContextHolder.getCurrentSpan()).isNull(); + then(this.accumulator.getSpans()).isEmpty(); + } + @Test public void shouldWorkForQueue() { this.queueChannel.send(MessageBuilder.withPayload("hi").build());