Added a check for null message / message payload
fixes gh-861
This commit is contained in:
@@ -22,7 +22,6 @@ import java.util.Map;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.aop.support.AopUtils;
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.cloud.sleuth.Log;
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
import org.springframework.cloud.sleuth.sampler.NeverSampler;
|
||||
import org.springframework.integration.channel.DirectChannel;
|
||||
@@ -54,6 +53,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
@Override
|
||||
public void afterSendCompletion(Message<?> message, MessageChannel channel,
|
||||
boolean sent, Exception ex) {
|
||||
if (emptyMessage(message)) {
|
||||
return;
|
||||
}
|
||||
if (isDirectChannel(channel)) {
|
||||
afterMessageHandled(message, channel, null, ex);
|
||||
}
|
||||
@@ -81,20 +83,11 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean containsServerReceived(Span span) {
|
||||
if (span == null) {
|
||||
return false;
|
||||
}
|
||||
for (Log log : span.logs()) {
|
||||
if (Span.SERVER_RECV.equals(log.getEvent())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message<?> preSend(Message<?> message, MessageChannel channel) {
|
||||
if (emptyMessage(message)) {
|
||||
return message;
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Processing message before sending it to the channel");
|
||||
}
|
||||
@@ -142,6 +135,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
|
||||
private Message<?> outputMessage(Message<?> message, MessageBuilder<?> messageBuilder,
|
||||
MessageHeaderAccessor headers) {
|
||||
if (emptyMessage(message)) {
|
||||
return message;
|
||||
}
|
||||
if (message instanceof ErrorMessage) {
|
||||
headers.copyHeaders(sleuthHeaders(messageBuilder.build().getHeaders()));
|
||||
return new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders());
|
||||
@@ -183,6 +179,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
@Override
|
||||
public Message<?> beforeHandle(Message<?> message, MessageChannel channel,
|
||||
MessageHandler handler) {
|
||||
if (emptyMessage(message)) {
|
||||
return message;
|
||||
}
|
||||
Message<?> retrievedMessage = getMessage(message);
|
||||
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);
|
||||
Span spanFromHeader = getTracer().isTracing() ? getTracer().getCurrentSpan()
|
||||
@@ -206,6 +205,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
@Override
|
||||
public void afterMessageHandled(Message<?> message, MessageChannel channel,
|
||||
MessageHandler handler, Exception ex) {
|
||||
if (emptyMessage(message)) {
|
||||
return;
|
||||
}
|
||||
Span spanFromHeader = getTracer().getCurrentSpan();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Continuing span " + spanFromHeader + " after message handled");
|
||||
@@ -226,27 +228,13 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void postSend(Message<?> message, MessageChannel channel,
|
||||
boolean sent) {
|
||||
super.postSend(message, channel, sent);
|
||||
}
|
||||
|
||||
@Override public boolean preReceive(MessageChannel channel) {
|
||||
return super.preReceive(channel);
|
||||
}
|
||||
|
||||
@Override public Message<?> postReceive(Message<?> message, MessageChannel channel) {
|
||||
return super.postReceive(message, channel);
|
||||
}
|
||||
|
||||
@Override public void afterReceiveCompletion(Message<?> message,
|
||||
MessageChannel channel, Exception ex) {
|
||||
super.afterReceiveCompletion(message, channel, ex);
|
||||
}
|
||||
|
||||
private void addErrorTag(Exception ex) {
|
||||
if (ex != null) {
|
||||
getErrorParser().parseErrorTags(getTracer().getCurrentSpan(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean emptyMessage(Message<?> message) {
|
||||
return message == null || message.getPayload() == null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,6 +69,7 @@ import org.springframework.util.SerializationUtils;
|
||||
|
||||
import static org.assertj.core.api.BDDAssertions.then;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.springframework.cloud.sleuth.assertions.SleuthAssertions.then;
|
||||
|
||||
/**
|
||||
@@ -109,6 +110,9 @@ public class TraceChannelInterceptorTests implements MessageHandler {
|
||||
@Autowired
|
||||
private ArrayListSpanAccumulator accumulator;
|
||||
|
||||
@Autowired
|
||||
private TraceChannelInterceptor interceptor;
|
||||
|
||||
private Message<?> message;
|
||||
|
||||
private Span span;
|
||||
@@ -140,6 +144,21 @@ public class TraceChannelInterceptorTests implements MessageHandler {
|
||||
this.accumulator.getSpans().clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWorkForEmptyQueue() {
|
||||
this.interceptor.beforeHandle(null, null, null);
|
||||
this.interceptor.afterMessageHandled(null, null, null, null);
|
||||
this.interceptor.afterSendCompletion(null, null, true, null);
|
||||
this.interceptor.preSend(null, null);
|
||||
|
||||
thenNoSpanReported();
|
||||
}
|
||||
|
||||
private void thenNoSpanReported() {
|
||||
then(TestSpanContextHolder.getCurrentSpan()).isNull();
|
||||
then(this.accumulator.getSpans()).isEmpty();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldWorkForQueue() {
|
||||
this.queueChannel.send(MessageBuilder.withPayload("hi").build());
|
||||
|
||||
Reference in New Issue
Block a user