From 3e7e54e09bd3dd41d9a0b3c717d0984832bfbcef Mon Sep 17 00:00:00 2001 From: Marcin Grzejszczak Date: Fri, 7 Jul 2017 11:16:17 +0200 Subject: [PATCH] Initial Reactor support --- pom.xml | 13 ++ spring-cloud-sleuth-reactor/.jdk8 | 0 spring-cloud-sleuth-reactor/pom.xml | 96 ++++++++++ .../cloud/sleuth/reactor/SpanSubscriber.java | 177 ++++++++++++++++++ .../TraceReactorAutoConfiguration.java | 60 ++++++ .../main/resources/META-INF/spring.factories | 3 + .../sleuth/reactor/SpanSubscriberTests.java | 119 ++++++++++++ .../src/test/resources/application.properties | 1 + 8 files changed, 469 insertions(+) create mode 100644 spring-cloud-sleuth-reactor/.jdk8 create mode 100644 spring-cloud-sleuth-reactor/pom.xml create mode 100644 spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/SpanSubscriber.java create mode 100644 spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/TraceReactorAutoConfiguration.java create mode 100644 spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/reactor/SpanSubscriberTests.java create mode 100644 spring-cloud-sleuth-reactor/src/test/resources/application.properties diff --git a/pom.xml b/pom.xml index cb78fa601..e6265634a 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,7 @@ spring-cloud-sleuth-core spring-cloud-sleuth-zipkin spring-cloud-sleuth-stream + spring-cloud-sleuth-reactor spring-cloud-sleuth-zipkin-stream spring-cloud-starter-sleuth spring-cloud-starter-zipkin @@ -195,6 +196,16 @@ 2.1 + + io.projectreactor + reactor-core + ${reactor.version} + + + org.reactivestreams + reactive-streams + ${reactive-streams.version} + org.hamcrest hamcrest-core @@ -238,6 +249,8 @@ 1.2.3.BUILD-SNAPSHOT Ditmars.BUILD-SNAPSHOT 1.4.0.BUILD-SNAPSHOT + 3.1.0.BUILD-SNAPSHOT + 1.0.0 diff --git a/spring-cloud-sleuth-reactor/.jdk8 b/spring-cloud-sleuth-reactor/.jdk8 new file mode 100644 index 000000000..e69de29bb diff --git a/spring-cloud-sleuth-reactor/pom.xml b/spring-cloud-sleuth-reactor/pom.xml new file mode 100644 index 000000000..f443d879e --- /dev/null +++ b/spring-cloud-sleuth-reactor/pom.xml @@ -0,0 +1,96 @@ + + + 4.0.0 + + spring-cloud-sleuth-reactor + jar + Spring Cloud Sleuth Reactor + Spring Cloud Sleuth Reactor + + + org.springframework.cloud + spring-cloud-sleuth + 1.3.0.BUILD-SNAPSHOT + .. + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + default-compile + + true + true + + 1.8 + 1.8 + + + + + default-testCompile + + true + true + + 1.8 + 1.8 + + + + + + + + + + + org.springframework.cloud + spring-cloud-sleuth-core + + + org.springframework.boot + spring-boot-starter + + + io.projectreactor + reactor-core + true + + + org.reactivestreams + reactive-streams + true + + + org.aspectj + aspectjweaver + test + + + org.assertj + assertj-core + test + + + org.awaitility + awaitility + test + + + org.springframework.boot + spring-boot-starter-test + test + + + + diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/SpanSubscriber.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/SpanSubscriber.java new file mode 100644 index 000000000..ea7d7a132 --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/SpanSubscriber.java @@ -0,0 +1,177 @@ +package org.springframework.cloud.sleuth.reactor; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.cloud.sleuth.Span; +import org.springframework.cloud.sleuth.Tracer; +import reactor.util.Logger; +import reactor.util.Loggers; +import reactor.util.context.Context; +import reactor.util.context.Contextualized; + +/** + * A trace representation of the {@link Subscriber} + * + * @author Stephane Maldini + * @author Marcin Grzejszczak + * @since 1.3.0 + */ +class SpanSubscriber extends AtomicBoolean + implements Subscriber, Subscription, Contextualized { + + private static final Logger log = Loggers.getLogger(SpanSubscriber.class); + + private final Span span; + private final Span rootSpan; + private final Subscriber subscriber; + private final Context context; + private final Tracer tracer; + private Subscription s; + + SpanSubscriber(Subscriber subscriber, Context ctx, Tracer tracer, + String name) { + this.subscriber = subscriber; + this.tracer = tracer; + Span root = ctx.getOrDefault(Span.class, tracer.getCurrentSpan()); + if (log.isTraceEnabled()) { + log.trace("Span from context [{}]", root); + } + this.rootSpan = root; + if (log.isTraceEnabled()) { + log.trace("Stored context root span [{}]", this.rootSpan); + } + this.span = tracer.createSpan(name, root); + if (log.isTraceEnabled()) { + log.trace("Created span [{}], with name [{}]", this.span, name); + } + this.context = ctx.put(Span.class, this.span); + } + + @Override public void onSubscribe(Subscription subscription) { + if (log.isTraceEnabled()) { + log.trace("On subscribe"); + } + this.s = subscription; + this.tracer.continueSpan(this.span); + if (log.isTraceEnabled()) { + log.trace("On subscribe - span continued"); + } + this.subscriber.onSubscribe(this); + } + + @Override public void request(long n) { + if (log.isTraceEnabled()) { + log.trace("Request"); + } + this.tracer.continueSpan(this.span); + if (log.isTraceEnabled()) { + log.trace("Request - continued"); + } + this.s.request(n); + // We're in the main thread so we don't want to pollute it with wrong spans + // that's why we need to detach the current one and continue with its parent + Span localRootSpan = this.span; + while (localRootSpan != null) { + if (this.rootSpan != null) { + if (localRootSpan.getSpanId() != this.rootSpan.getSpanId() && + !isRootParentSpan(localRootSpan)) { + localRootSpan = continueDetachedSpan(localRootSpan); + } else { + localRootSpan = null; + } + } else if (!isRootParentSpan(localRootSpan)) { + localRootSpan = continueDetachedSpan(localRootSpan); + } else { + localRootSpan = null; + } + } + if (log.isTraceEnabled()) { + log.trace("Request after cleaning. Current span [{}]", + this.tracer.getCurrentSpan()); + } + } + + private boolean isRootParentSpan(Span localRootSpan) { + return localRootSpan.getSpanId() == localRootSpan.getTraceId(); + } + + private Span continueDetachedSpan(Span localRootSpan) { + if (log.isTraceEnabled()) { + log.trace("Will detach span {}", localRootSpan); + } + Span detachedSpan = this.tracer.detach(localRootSpan); + return this.tracer.continueSpan(detachedSpan); + } + + @Override public void cancel() { + try { + if (log.isTraceEnabled()) { + log.trace("Cancel"); + } + this.s.cancel(); + } + finally { + cleanup(); + } + } + + @Override public void onNext(Object o) { + this.subscriber.onNext(o); + } + + @Override public void onError(Throwable throwable) { + try { + this.subscriber.onError(throwable); + } + finally { + cleanup(); + } + } + + @Override public void onComplete() { + try { + this.subscriber.onComplete(); + } + finally { + cleanup(); + } + } + + void cleanup() { + if (compareAndSet(false, true)) { + if (log.isTraceEnabled()) { + log.trace("Cleaning up"); + } + if (this.tracer.getCurrentSpan() != this.span) { + if (log.isTraceEnabled()) { + log.trace("Detaching span"); + } + this.tracer.detach(this.tracer.getCurrentSpan()); + this.tracer.continueSpan(this.span); + if (log.isTraceEnabled()) { + log.trace("Continuing span"); + } + } + if (log.isTraceEnabled()) { + log.trace("Closing span"); + } + this.tracer.close(this.span); + if (log.isTraceEnabled()) { + log.trace("Span closed"); + } + if (this.rootSpan != null) { + this.tracer.continueSpan(this.rootSpan); + this.tracer.close(this.rootSpan); + if (log.isTraceEnabled()) { + log.trace("Closed root span"); + } + } + } + } + + @Override public Context currentContext() { + return this.context; + } +} \ No newline at end of file diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/TraceReactorAutoConfiguration.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/TraceReactorAutoConfiguration.java new file mode 100644 index 000000000..ac2e1e835 --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/reactor/TraceReactorAutoConfiguration.java @@ -0,0 +1,60 @@ +package org.springframework.cloud.sleuth.reactor; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; +import javax.annotation.PostConstruct; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.cloud.sleuth.SpanNamer; +import org.springframework.cloud.sleuth.TraceKeys; +import org.springframework.cloud.sleuth.Tracer; +import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration; +import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; +import reactor.util.context.Context; + +/** + * {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration} + * to enable tracing of Reactor components via Spring Cloud Sleuth. + * + * @author Stephane Maldini + * @author Marcin Grzejszczak + * @since 1.3.0 + */ +@Configuration +@ConditionalOnProperty(value="spring.sleuth.reactor.enabled", matchIfMissing=true) +@ConditionalOnClass(Mono.class) +@AutoConfigureAfter(TraceAutoConfiguration.class) +public class TraceReactorAutoConfiguration { + + @Configuration + @ConditionalOnBean(Tracer.class) + static class TraceReactorConfiguration { + @Autowired Tracer tracer; + @Autowired TraceKeys traceKeys; + @Autowired SpanNamer spanNamer; + + @PostConstruct + public void setupHooks() { + Hooks.onNewSubscriber((pub, sub) -> + new SpanSubscriber(sub, Context.from(sub), this.tracer, pub.toString())); + Schedulers.setFactory(new Schedulers.Factory() { + @Override public ScheduledExecutorService decorateScheduledExecutorService( + String schedulerType, + Supplier actual) { + return new TraceableScheduledExecutorService(actual.get(), + TraceReactorConfiguration.this.tracer, + TraceReactorConfiguration.this.traceKeys, + TraceReactorConfiguration.this.spanNamer); + } + }); + } + } +} diff --git a/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..fcf4eae22 --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories @@ -0,0 +1,3 @@ +# Auto Configuration +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.springframework.cloud.sleuth.reactor.TraceReactorAutoConfiguration diff --git a/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/reactor/SpanSubscriberTests.java new file mode 100644 index 000000000..1be4a4798 --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/reactor/SpanSubscriberTests.java @@ -0,0 +1,119 @@ +package org.springframework.cloud.sleuth.reactor; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.awaitility.Awaitility; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.reactivestreams.Publisher; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.cloud.sleuth.Sampler; +import org.springframework.cloud.sleuth.Span; +import org.springframework.cloud.sleuth.Tracer; +import org.springframework.cloud.sleuth.sampler.AlwaysSampler; +import org.springframework.cloud.sleuth.util.ExceptionUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.test.context.junit4.SpringRunner; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import static org.assertj.core.api.BDDAssertions.then; + +@RunWith(SpringRunner.class) +@SpringBootTest(classes = SpanSubscriberTests.Config.class, + webEnvironment = SpringBootTest.WebEnvironment.NONE) +public class SpanSubscriberTests { + + private static final Log log = LogFactory.getLog(SpanSubscriberTests.class); + + @Autowired Tracer tracer; + + @Before + public void setup() { + ExceptionUtils.setFail(true); + } + + @Test public void should_pass_tracing_info_when_using_reactor() { + Span span = this.tracer.createSpan("foo"); + final AtomicReference spanInOperation = new AtomicReference<>(); + Publisher traced = Flux.just(1, 2, 3); + log.info("Hello"); + + Flux.from(traced) + .map( d -> d + 1) + .map( d -> d + 1) + .map( (d) -> { + spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); + return d + 1; + }) + .map( d -> d + 1) + .subscribe(System.out::println); + + then(this.tracer.getCurrentSpan()).isNull(); + then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId()); + then(ExceptionUtils.getLastException()).isNull(); + } + + @Test + public void should_pass_tracing_info_when_using_reactor_async() { + + Span span = this.tracer.createSpan("foo"); + final AtomicReference spanInOperation = new AtomicReference<>(); + log.info("Hello"); + + Flux.just(1, 2, 3) + .publishOn(Schedulers.single()) + .log("reactor.1") + .map( d -> d + 1) + .map( d -> d + 1) + .publishOn(Schedulers.newSingle("secondThread")) + .log("reactor.2") + .map( (d) -> { + spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); + return d + 1; + }) + .map( d -> d + 1) + .blockLast(); + + Awaitility.await().untilAsserted(() -> { + then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId()); + then(ExceptionUtils.getLastException()).isNull(); + }); + then(this.tracer.getCurrentSpan()).isEqualTo(span); + this.tracer.close(span); + + Span foo2 = this.tracer.createSpan("foo2"); + + Flux.just(1, 2, 3) + .publishOn(Schedulers.single()) + .log("reactor.") + .map( d -> d + 1) + .map( d -> d + 1) + .map( (d) -> { + spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); + return d + 1; + }) + .map( d -> d + 1) + .blockLast(); + + then(this.tracer.getCurrentSpan()).isEqualTo(foo2); + then(ExceptionUtils.getLastException()).isNull(); + // parent cause there's an async span in the meantime + then(spanInOperation.get().getSavedSpan().getParents().get(0)).isEqualTo(foo2.getSpanId()); + tracer.close(foo2); + } + + @EnableAutoConfiguration + @Configuration + static class Config { + @Bean Sampler sampler() { + return new AlwaysSampler(); + } + } +} \ No newline at end of file diff --git a/spring-cloud-sleuth-reactor/src/test/resources/application.properties b/spring-cloud-sleuth-reactor/src/test/resources/application.properties new file mode 100644 index 000000000..4508cbe6d --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/test/resources/application.properties @@ -0,0 +1 @@ +logging.level.org.springframework.cloud.sleuth.reactor=TRACE \ No newline at end of file