fix #646 : Filter fused flows from tracing (#648)

This commit is contained in:
Stephane Maldini
2017-07-20 01:29:47 -07:00
committed by Marcin Grzejszczak
parent 0efd29de35
commit 1d50aded1c
2 changed files with 80 additions and 2 deletions

View File

@@ -15,6 +15,8 @@ import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService;
import org.springframework.context.annotation.Configuration;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -42,8 +44,15 @@ public class TraceReactorAutoConfiguration {
@PostConstruct
public void setupHooks() {
Hooks.onNewSubscriber((pub, sub) ->
new SpanSubscriber(sub, sub.currentContext(), this.tracer, pub.toString()));
Hooks.onNewSubscriber((pub, sub) -> {
//do not trace fused flows or simple just/error/empty
if(pub instanceof Fuseable && sub instanceof Fuseable.QueueSubscription
|| pub instanceof Fuseable.ScalarCallable){
return sub;
}
return new SpanSubscriber(sub, sub.currentContext(), this.tracer, pub
.toString());
});
Schedulers.setFactory(new Schedulers.Factory() {
@Override public ScheduledExecutorService decorateScheduledExecutorService(
String schedulerType,

View File

@@ -20,7 +20,12 @@ import org.springframework.cloud.sleuth.util.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import static org.assertj.core.api.BDDAssertions.then;
@@ -60,6 +65,70 @@ public class SpanSubscriberTests {
then(ExceptionUtils.getLastException()).isNull();
}
@Test public void should_support_reactor_fusion_optimization() {
Span span = this.tracer.createSpan("foo");
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
log.info("Hello");
Mono.just(1)
.flatMap( d -> Flux.just(d + 1).collectList().map(p -> p.get(0)))
.map( d -> d + 1)
.map( (d) -> {
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
return d + 1;
})
.map( d -> d + 1)
.subscribe(System.out::println);
then(this.tracer.getCurrentSpan()).isNull();
then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId());
then(ExceptionUtils.getLastException()).isNull();
}
@Test public void should_not_trace_scalar_flows() {
Span span = this.tracer.createSpan("foo");
final AtomicReference<Subscription> spanInOperation = new AtomicReference<>();
log.info("Hello");
Mono.just(1)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
spanInOperation.set(subscription);
}
});
then(this.tracer.getCurrentSpan()).isNotNull();
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
Mono.<Integer>error(new Exception())
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
spanInOperation.set(subscription);
}
@Override
protected void hookOnError(Throwable throwable) {
}
});
then(this.tracer.getCurrentSpan()).isNotNull();
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
Mono.<Integer>empty()
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
spanInOperation.set(subscription);
}
});
then(this.tracer.getCurrentSpan()).isNotNull();
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
then(ExceptionUtils.getLastException()).isNull();
}
@Test
public void should_pass_tracing_info_when_using_reactor_async() {