diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java new file mode 100644 index 000000000..3f203641e --- /dev/null +++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java @@ -0,0 +1,52 @@ +package org.springframework.cloud.sleuth.instrument.reactor; + +import java.util.function.Function; +import java.util.function.Predicate; + +import org.reactivestreams.Publisher; +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.core.publisher.Operators; + +import org.springframework.cloud.sleuth.Tracer; + +/** + * Reactive Span pointcuts factories + * + * @author Stephane Maldini + */ +public abstract class ReactorSleuth { + + /** + * Return a span operator pointcut given a {@link Tracer}. This can be used in reactor + * via {@link reactor.core.publisher.Flux#transform(Function)}, {@link + * reactor.core.publisher.Mono#transform(Function)}, {@link + * reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link + * reactor.core.publisher.Hooks#onLastOperator(Function)}. + * + * @param tracer the {@link Tracer} instance to use in this span operator + * @param an arbitrary type that is left unchanged by the span operator + * + * @return a new Span operator pointcut + */ + public static Function, ? extends Publisher> spanOperator(Tracer tracer) { + return Operators.lift(POINTCUT_FILTER, ((scannable, sub) -> { + //do not trace fused flows + if(scannable instanceof Fuseable && sub instanceof Fuseable.QueueSubscription){ + return sub; + } + + return new SpanSubscriber<>( + sub, + sub.currentContext(), + tracer, + scannable.name()); + })); + } + + private static final Predicate POINTCUT_FILTER = + s -> !(s instanceof Fuseable.ScalarCallable); + + private ReactorSleuth() { + } +} diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java index eaa976613..f401c919f 100644 --- a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java +++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java @@ -18,18 +18,19 @@ import reactor.util.context.Context; * @author Marcin Grzejszczak * @since 1.3.0 */ -class SpanSubscriber extends AtomicBoolean implements Subscription, CoreSubscriber { +final class SpanSubscriber extends AtomicBoolean implements Subscription, + CoreSubscriber { private static final Logger log = Loggers.getLogger(SpanSubscriber.class); private final Span span; private final Span rootSpan; - private final Subscriber subscriber; + private final Subscriber subscriber; private final Context context; private final Tracer tracer; private Subscription s; - SpanSubscriber(Subscriber subscriber, Context ctx, Tracer tracer, + SpanSubscriber(Subscriber subscriber, Context ctx, Tracer tracer, String name) { this.subscriber = subscriber; this.tracer = tracer; @@ -116,7 +117,7 @@ class SpanSubscriber extends AtomicBoolean implements Subscription, CoreSubscrib } } - @Override public void onNext(Object o) { + @Override public void onNext(T o) { this.subscriber.onNext(o); } diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java index 2d98823a3..91c74662b 100644 --- a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java +++ b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java @@ -44,15 +44,7 @@ public class TraceReactorAutoConfiguration { @PostConstruct public void setupHooks() { - Hooks.onNewSubscriber((pub, sub) -> { - //do not trace fused flows or simple just/error/empty - if(pub instanceof Fuseable && sub instanceof Fuseable.QueueSubscription - || pub instanceof Fuseable.ScalarCallable){ - return sub; - } - return new SpanSubscriber(sub, sub.currentContext(), this.tracer, pub - .toString()); - }); + Hooks.onLastOperator(ReactorSleuth.spanOperator(tracer)); Schedulers.setFactory(new Schedulers.Factory() { @Override public ScheduledExecutorService decorateScheduledExecutorService( String schedulerType,