Fixes DirectChannel support (#860)

without this change for DirectChannel `beforeHandle` and `afterHandle` were not called at all
with this change we're removing any hacks and we're executing those methods

fixes gh-821
This commit is contained in:
Marcin Grzejszczak
2018-02-14 23:07:51 +01:00
committed by GitHub
parent 68dc29a0ed
commit 20891321a3
2 changed files with 48 additions and 10 deletions

View File

@@ -20,16 +20,17 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Log;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.sampler.NeverSampler;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
@@ -53,6 +54,9 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
@Override
public void afterSendCompletion(Message<?> message, MessageChannel channel,
boolean sent, Exception ex) {
if (isDirectChannel(channel)) {
afterMessageHandled(message, channel, null, ex);
}
Message<?> retrievedMessage = getMessage(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);
Span currentSpan = getTracer().isTracing() ? getTracer().getCurrentSpan()
@@ -61,13 +65,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
log.debug("Completed sending and current span is " + currentSpan);
}
getTracer().continueSpan(currentSpan);
if (containsServerReceived(currentSpan)) {
if (log.isDebugEnabled()) {
log.debug("Marking span with server send");
}
currentSpan.logEvent(Span.SERVER_SEND);
}
else if (currentSpan != null) {
if (currentSpan != null) {
if (log.isDebugEnabled()) {
log.debug("Marking span with client received");
}
@@ -130,6 +128,20 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
getSpanInjector().inject(span, new MessagingTextMap(messageBuilder));
MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message);
Message<?> outputMessage = outputMessage(message, messageBuilder, headers);
if (isDirectChannel(channel)) {
beforeHandle(outputMessage, channel, null);
}
return outputMessage;
}
private boolean isDirectChannel(MessageChannel channel) {
return DirectChannel.class
.isAssignableFrom(AopUtils.getTargetClass(channel));
}
private Message<?> outputMessage(Message<?> message, MessageBuilder<?> messageBuilder,
MessageHeaderAccessor headers) {
if (message instanceof ErrorMessage) {
headers.copyHeaders(sleuthHeaders(messageBuilder.build().getHeaders()));
return new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders());
@@ -206,7 +218,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
addErrorTag(ex);
}
// related to #447
if (getTracer().isTracing()) {
if (getTracer().isTracing() && !(isDirectChannel(channel))) {
getTracer().detach(spanFromHeader);
if (log.isDebugEnabled()) {
log.debug("Detached " + spanFromHeader + " from current thread");
@@ -214,6 +226,24 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
}
@Override public void postSend(Message<?> message, MessageChannel channel,
boolean sent) {
super.postSend(message, channel, sent);
}
@Override public boolean preReceive(MessageChannel channel) {
return super.preReceive(channel);
}
@Override public Message<?> postReceive(Message<?> message, MessageChannel channel) {
return super.postReceive(message, channel);
}
@Override public void afterReceiveCompletion(Message<?> message,
MessageChannel channel, Exception ex) {
super.afterReceiveCompletion(message, channel, ex);
}
private void addErrorTag(Exception ex) {
if (ex != null) {
getErrorParser().parseErrorTags(getTracer().getCurrentSpan(), ex);

View File

@@ -17,6 +17,7 @@
package org.springframework.cloud.sleuth.instrument.messaging;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -32,6 +33,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.sleuth.Log;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
@@ -194,6 +196,12 @@ public class TraceChannelInterceptorTests implements MessageHandler {
then(traceId).isEqualTo(10L);
then(spanId).isNotEqualTo(20L);
then(this.accumulator.getSpans()).hasSize(1);
List<Log> logs = this.accumulator.getSpans().get(0).logs();
then(logs).hasSize(4);
then(logs.get(0).getEvent()).isEqualTo(Span.CLIENT_SEND);
then(logs.get(1).getEvent()).isEqualTo(Span.SERVER_RECV);
then(logs.get(2).getEvent()).isEqualTo(Span.SERVER_SEND);
then(logs.get(3).getEvent()).isEqualTo(Span.CLIENT_RECV);
}
@Test