diff --git a/pom.xml b/pom.xml
index 73b7b07f6..408b2fe15 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,7 @@
spring-cloud-sleuth-core
spring-cloud-sleuth-zipkin
spring-cloud-sleuth-stream
+ spring-cloud-sleuth-reactor
spring-cloud-sleuth-zipkin-stream
spring-cloud-starter-sleuth
spring-cloud-starter-zipkin
@@ -195,6 +196,16 @@
2.1
+
+ io.projectreactor
+ reactor-core
+ ${reactor.version}
+
+
+ org.reactivestreams
+ reactive-streams
+ ${reactive-streams.version}
+
org.hamcrest
hamcrest-core
@@ -232,6 +243,8 @@
1.3.0.BUILD-SNAPSHOT
Ditmars.BUILD-SNAPSHOT
1.4.0.BUILD-SNAPSHOT
+ 3.1.0.RELEASE
+ 1.0.0
diff --git a/spring-cloud-sleuth-dependencies/pom.xml b/spring-cloud-sleuth-dependencies/pom.xml
index cee1c2550..57f966d32 100644
--- a/spring-cloud-sleuth-dependencies/pom.xml
+++ b/spring-cloud-sleuth-dependencies/pom.xml
@@ -24,6 +24,11 @@
spring-cloud-sleuth-core
${project.version}
+
+ org.springframework.cloud
+ spring-cloud-sleuth-reactor
+ ${project.version}
+
org.springframework.cloud
spring-cloud-sleuth-zipkin
diff --git a/spring-cloud-sleuth-reactor/.jdk8 b/spring-cloud-sleuth-reactor/.jdk8
new file mode 100644
index 000000000..e69de29bb
diff --git a/spring-cloud-sleuth-reactor/pom.xml b/spring-cloud-sleuth-reactor/pom.xml
new file mode 100644
index 000000000..f443d879e
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/pom.xml
@@ -0,0 +1,96 @@
+
+
+ 4.0.0
+
+ spring-cloud-sleuth-reactor
+ jar
+ Spring Cloud Sleuth Reactor
+ Spring Cloud Sleuth Reactor
+
+
+ org.springframework.cloud
+ spring-cloud-sleuth
+ 1.3.0.BUILD-SNAPSHOT
+ ..
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+ 1.8
+ 1.8
+
+
+
+ default-compile
+
+ true
+ true
+
+ 1.8
+ 1.8
+
+
+
+
+ default-testCompile
+
+ true
+ true
+
+ 1.8
+ 1.8
+
+
+
+
+
+
+
+
+
+
+ org.springframework.cloud
+ spring-cloud-sleuth-core
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+ io.projectreactor
+ reactor-core
+ true
+
+
+ org.reactivestreams
+ reactive-streams
+ true
+
+
+ org.aspectj
+ aspectjweaver
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+ org.awaitility
+ awaitility
+ test
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java
new file mode 100644
index 000000000..d8c6b244a
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java
@@ -0,0 +1,52 @@
+package org.springframework.cloud.sleuth.instrument.reactor;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import org.reactivestreams.Publisher;
+import org.springframework.cloud.sleuth.Tracer;
+
+import reactor.core.Fuseable;
+import reactor.core.Scannable;
+import reactor.core.publisher.Operators;
+
+/**
+ * Reactive Span pointcuts factories
+ *
+ * @author Stephane Maldini
+ * @since 1.3.0
+ */
+public abstract class ReactorSleuth {
+
+ /**
+ * Return a span operator pointcut given a {@link Tracer}. This can be used in reactor
+ * via {@link reactor.core.publisher.Flux#transform(Function)}, {@link
+ * reactor.core.publisher.Mono#transform(Function)}, {@link
+ * reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link
+ * reactor.core.publisher.Hooks#onLastOperator(Function)}.
+ *
+ * @param tracer the {@link Tracer} instance to use in this span operator
+ * @param an arbitrary type that is left unchanged by the span operator
+ *
+ * @return a new Span operator pointcut
+ */
+ public static Function super Publisher, ? extends Publisher> spanOperator(Tracer tracer) {
+ return Operators.lift(POINTCUT_FILTER, ((scannable, sub) -> {
+ //do not trace fused flows
+ if(scannable instanceof Fuseable && sub instanceof Fuseable.QueueSubscription){
+ return sub;
+ }
+ return new SpanSubscriber<>(
+ sub,
+ sub.currentContext(),
+ tracer,
+ scannable.name());
+ }));
+ }
+
+ private static final Predicate POINTCUT_FILTER =
+ s -> !(s instanceof Fuseable.ScalarCallable);
+
+ private ReactorSleuth() {
+ }
+}
diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java
new file mode 100644
index 000000000..f401c919f
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java
@@ -0,0 +1,177 @@
+package org.springframework.cloud.sleuth.instrument.reactor;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.springframework.cloud.sleuth.Span;
+import org.springframework.cloud.sleuth.Tracer;
+import reactor.core.CoreSubscriber;
+import reactor.util.Logger;
+import reactor.util.Loggers;
+import reactor.util.context.Context;
+
+/**
+ * A trace representation of the {@link Subscriber}
+ *
+ * @author Stephane Maldini
+ * @author Marcin Grzejszczak
+ * @since 1.3.0
+ */
+final class SpanSubscriber extends AtomicBoolean implements Subscription,
+ CoreSubscriber {
+
+ private static final Logger log = Loggers.getLogger(SpanSubscriber.class);
+
+ private final Span span;
+ private final Span rootSpan;
+ private final Subscriber super T> subscriber;
+ private final Context context;
+ private final Tracer tracer;
+ private Subscription s;
+
+ SpanSubscriber(Subscriber super T> subscriber, Context ctx, Tracer tracer,
+ String name) {
+ this.subscriber = subscriber;
+ this.tracer = tracer;
+ Span root = ctx.getOrDefault(Span.class, tracer.getCurrentSpan());
+ if (log.isTraceEnabled()) {
+ log.trace("Span from context [{}]", root);
+ }
+ this.rootSpan = root;
+ if (log.isTraceEnabled()) {
+ log.trace("Stored context root span [{}]", this.rootSpan);
+ }
+ this.span = tracer.createSpan(name, root);
+ if (log.isTraceEnabled()) {
+ log.trace("Created span [{}], with name [{}]", this.span, name);
+ }
+ this.context = ctx.put(Span.class, this.span);
+ }
+
+ @Override public void onSubscribe(Subscription subscription) {
+ if (log.isTraceEnabled()) {
+ log.trace("On subscribe");
+ }
+ this.s = subscription;
+ this.tracer.continueSpan(this.span);
+ if (log.isTraceEnabled()) {
+ log.trace("On subscribe - span continued");
+ }
+ this.subscriber.onSubscribe(this);
+ }
+
+ @Override public void request(long n) {
+ if (log.isTraceEnabled()) {
+ log.trace("Request");
+ }
+ this.tracer.continueSpan(this.span);
+ if (log.isTraceEnabled()) {
+ log.trace("Request - continued");
+ }
+ this.s.request(n);
+ // We're in the main thread so we don't want to pollute it with wrong spans
+ // that's why we need to detach the current one and continue with its parent
+ Span localRootSpan = this.span;
+ while (localRootSpan != null) {
+ if (this.rootSpan != null) {
+ if (localRootSpan.getSpanId() != this.rootSpan.getSpanId() &&
+ !isRootParentSpan(localRootSpan)) {
+ localRootSpan = continueDetachedSpan(localRootSpan);
+ } else {
+ localRootSpan = null;
+ }
+ } else if (!isRootParentSpan(localRootSpan)) {
+ localRootSpan = continueDetachedSpan(localRootSpan);
+ } else {
+ localRootSpan = null;
+ }
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Request after cleaning. Current span [{}]",
+ this.tracer.getCurrentSpan());
+ }
+ }
+
+ private boolean isRootParentSpan(Span localRootSpan) {
+ return localRootSpan.getSpanId() == localRootSpan.getTraceId();
+ }
+
+ private Span continueDetachedSpan(Span localRootSpan) {
+ if (log.isTraceEnabled()) {
+ log.trace("Will detach span {}", localRootSpan);
+ }
+ Span detachedSpan = this.tracer.detach(localRootSpan);
+ return this.tracer.continueSpan(detachedSpan);
+ }
+
+ @Override public void cancel() {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Cancel");
+ }
+ this.s.cancel();
+ }
+ finally {
+ cleanup();
+ }
+ }
+
+ @Override public void onNext(T o) {
+ this.subscriber.onNext(o);
+ }
+
+ @Override public void onError(Throwable throwable) {
+ try {
+ this.subscriber.onError(throwable);
+ }
+ finally {
+ cleanup();
+ }
+ }
+
+ @Override public void onComplete() {
+ try {
+ this.subscriber.onComplete();
+ }
+ finally {
+ cleanup();
+ }
+ }
+
+ void cleanup() {
+ if (compareAndSet(false, true)) {
+ if (log.isTraceEnabled()) {
+ log.trace("Cleaning up");
+ }
+ if (this.tracer.getCurrentSpan() != this.span) {
+ if (log.isTraceEnabled()) {
+ log.trace("Detaching span");
+ }
+ this.tracer.detach(this.tracer.getCurrentSpan());
+ this.tracer.continueSpan(this.span);
+ if (log.isTraceEnabled()) {
+ log.trace("Continuing span");
+ }
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Closing span");
+ }
+ this.tracer.close(this.span);
+ if (log.isTraceEnabled()) {
+ log.trace("Span closed");
+ }
+ if (this.rootSpan != null) {
+ this.tracer.continueSpan(this.rootSpan);
+ this.tracer.close(this.rootSpan);
+ if (log.isTraceEnabled()) {
+ log.trace("Closed root span");
+ }
+ }
+ }
+ }
+
+ @Override public Context currentContext() {
+ return this.context;
+ }
+}
\ No newline at end of file
diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java
new file mode 100644
index 000000000..afaa94075
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java
@@ -0,0 +1,66 @@
+package org.springframework.cloud.sleuth.instrument.reactor;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.cloud.sleuth.SpanNamer;
+import org.springframework.cloud.sleuth.TraceKeys;
+import org.springframework.cloud.sleuth.Tracer;
+import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
+import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService;
+import org.springframework.context.annotation.Configuration;
+
+import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration}
+ * to enable tracing of Reactor components via Spring Cloud Sleuth.
+ *
+ * @author Stephane Maldini
+ * @author Marcin Grzejszczak
+ * @since 1.3.0
+ */
+@Configuration
+@ConditionalOnProperty(value="spring.sleuth.reactor.enabled", matchIfMissing=true)
+@ConditionalOnClass(Mono.class)
+@AutoConfigureAfter(TraceAutoConfiguration.class)
+public class TraceReactorAutoConfiguration {
+
+ @Configuration
+ @ConditionalOnBean(Tracer.class)
+ static class TraceReactorConfiguration {
+ @Autowired Tracer tracer;
+ @Autowired TraceKeys traceKeys;
+ @Autowired SpanNamer spanNamer;
+
+ @PostConstruct
+ public void setupHooks() {
+ Hooks.onLastOperator(ReactorSleuth.spanOperator(this.tracer));
+ Schedulers.setFactory(new Schedulers.Factory() {
+ @Override public ScheduledExecutorService decorateExecutorService(
+ String schedulerType,
+ Supplier extends ScheduledExecutorService> actual) {
+ return new TraceableScheduledExecutorService(actual.get(),
+ TraceReactorConfiguration.this.tracer,
+ TraceReactorConfiguration.this.traceKeys,
+ TraceReactorConfiguration.this.spanNamer);
+ }
+ });
+ }
+
+ @PreDestroy
+ public void cleanupHooks() {
+ Hooks.resetOnLastOperator();
+ Schedulers.resetFactory();
+ }
+ }
+}
diff --git a/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories
new file mode 100644
index 000000000..3938ffd61
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,3 @@
+# Auto Configuration
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.springframework.cloud.sleuth.instrument.reactor.TraceReactorAutoConfiguration
diff --git a/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java
new file mode 100644
index 000000000..d1ff48b5c
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java
@@ -0,0 +1,195 @@
+package org.springframework.cloud.sleuth.instrument.reactor;
+
+import static org.assertj.core.api.BDDAssertions.then;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.awaitility.Awaitility;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.cloud.sleuth.Sampler;
+import org.springframework.cloud.sleuth.Span;
+import org.springframework.cloud.sleuth.Tracer;
+import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
+import org.springframework.cloud.sleuth.util.ExceptionUtils;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = SpanSubscriberTests.Config.class,
+ webEnvironment = SpringBootTest.WebEnvironment.NONE)
+public class SpanSubscriberTests {
+
+ private static final Log log = LogFactory.getLog(SpanSubscriberTests.class);
+
+ @Autowired Tracer tracer;
+
+ @Before
+ public void setup() {
+ ExceptionUtils.setFail(true);
+ }
+
+ @Test public void should_pass_tracing_info_when_using_reactor() {
+ Span span = this.tracer.createSpan("foo");
+ final AtomicReference spanInOperation = new AtomicReference<>();
+ Publisher traced = Flux.just(1, 2, 3);
+ log.info("Hello");
+
+ Flux.from(traced)
+ .map( d -> d + 1)
+ .map( d -> d + 1)
+ .map( (d) -> {
+ spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
+ return d + 1;
+ })
+ .map( d -> d + 1)
+ .subscribe(System.out::println);
+
+ then(this.tracer.getCurrentSpan()).isNull();
+ then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId());
+ then(ExceptionUtils.getLastException()).isNull();
+ }
+
+ @Test public void should_support_reactor_fusion_optimization() {
+ Span span = this.tracer.createSpan("foo");
+ final AtomicReference spanInOperation = new AtomicReference<>();
+ log.info("Hello");
+
+ Mono.just(1)
+ .flatMap( d -> Flux.just(d + 1).collectList().map(p -> p.get(0)))
+ .map( d -> d + 1)
+ .map( (d) -> {
+ spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
+ return d + 1;
+ })
+ .map( d -> d + 1)
+ .subscribe(System.out::println);
+
+ then(this.tracer.getCurrentSpan()).isNull();
+ then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId());
+ then(ExceptionUtils.getLastException()).isNull();
+ }
+
+ @Test public void should_not_trace_scalar_flows() {
+ this.tracer.createSpan("foo");
+ final AtomicReference spanInOperation = new AtomicReference<>();
+ log.info("Hello");
+
+ Mono.just(1)
+ .subscribe(new BaseSubscriber() {
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ spanInOperation.set(subscription);
+ }
+ });
+
+ then(this.tracer.getCurrentSpan()).isNotNull();
+ then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
+
+ Mono.error(new Exception())
+ .subscribe(new BaseSubscriber() {
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ spanInOperation.set(subscription);
+ }
+
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ }
+ });
+
+ then(this.tracer.getCurrentSpan()).isNotNull();
+ then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
+
+ Mono.empty()
+ .subscribe(new BaseSubscriber() {
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ spanInOperation.set(subscription);
+ }
+ });
+
+ then(this.tracer.getCurrentSpan()).isNotNull();
+ then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
+ then(ExceptionUtils.getLastException()).isNull();
+ }
+
+ @Test
+ public void should_pass_tracing_info_when_using_reactor_async() {
+
+ Span span = this.tracer.createSpan("foo");
+ final AtomicReference spanInOperation = new AtomicReference<>();
+ log.info("Hello");
+
+ Flux.just(1, 2, 3)
+ .publishOn(Schedulers.single())
+ .log("reactor.1")
+ .map( d -> d + 1)
+ .map( d -> d + 1)
+ .publishOn(Schedulers.newSingle("secondThread"))
+ .log("reactor.2")
+ .map( (d) -> {
+ spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
+ return d + 1;
+ })
+ .map( d -> d + 1)
+ .blockLast();
+
+ Awaitility.await().untilAsserted(() -> {
+ then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId());
+ then(ExceptionUtils.getLastException()).isNull();
+ });
+ then(this.tracer.getCurrentSpan()).isEqualTo(span);
+ this.tracer.close(span);
+
+ Span foo2 = this.tracer.createSpan("foo2");
+
+ Flux.just(1, 2, 3)
+ .publishOn(Schedulers.single())
+ .log("reactor.")
+ .map( d -> d + 1)
+ .map( d -> d + 1)
+ .map( (d) -> {
+ spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
+ return d + 1;
+ })
+ .map( d -> d + 1)
+ .blockLast();
+
+ then(this.tracer.getCurrentSpan()).isEqualTo(foo2);
+ then(ExceptionUtils.getLastException()).isNull();
+ // parent cause there's an async span in the meantime
+ then(spanInOperation.get().getTraceId()).isEqualTo(foo2.getTraceId());
+ tracer.close(foo2);
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ Hooks.resetOnLastOperator();
+ Schedulers.resetFactory();
+ }
+
+ @EnableAutoConfiguration
+ @Configuration
+ static class Config {
+ @Bean Sampler sampler() {
+ return new AlwaysSampler();
+ }
+ }
+}
\ No newline at end of file
diff --git a/spring-cloud-sleuth-reactor/src/test/resources/application.properties b/spring-cloud-sleuth-reactor/src/test/resources/application.properties
new file mode 100644
index 000000000..8edfd35f9
--- /dev/null
+++ b/spring-cloud-sleuth-reactor/src/test/resources/application.properties
@@ -0,0 +1 @@
+logging.level.org.springframework.cloud.sleuth.instrument.reactor=TRACE
\ No newline at end of file