Sync with reactor-core 3.1.B-S latest hooks updates (#672)

This commit is contained in:
Stephane Maldini
2017-08-05 13:13:09 -07:00
committed by Marcin Grzejszczak
parent 3034497617
commit e698607184
3 changed files with 58 additions and 13 deletions

View File

@@ -0,0 +1,52 @@
package org.springframework.cloud.sleuth.instrument.reactor;
import java.util.function.Function;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.Operators;
import org.springframework.cloud.sleuth.Tracer;
/**
* Reactive Span pointcuts factories
*
* @author Stephane Maldini
*/
public abstract class ReactorSleuth {
/**
* Return a span operator pointcut given a {@link Tracer}. This can be used in reactor
* via {@link reactor.core.publisher.Flux#transform(Function)}, {@link
* reactor.core.publisher.Mono#transform(Function)}, {@link
* reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link
* reactor.core.publisher.Hooks#onLastOperator(Function)}.
*
* @param tracer the {@link Tracer} instance to use in this span operator
* @param <T> an arbitrary type that is left unchanged by the span operator
*
* @return a new Span operator pointcut
*/
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> spanOperator(Tracer tracer) {
return Operators.lift(POINTCUT_FILTER, ((scannable, sub) -> {
//do not trace fused flows
if(scannable instanceof Fuseable && sub instanceof Fuseable.QueueSubscription){
return sub;
}
return new SpanSubscriber<>(
sub,
sub.currentContext(),
tracer,
scannable.name());
}));
}
private static final Predicate<Scannable> POINTCUT_FILTER =
s -> !(s instanceof Fuseable.ScalarCallable);
private ReactorSleuth() {
}
}

View File

@@ -18,18 +18,19 @@ import reactor.util.context.Context;
* @author Marcin Grzejszczak
* @since 1.3.0
*/
class SpanSubscriber extends AtomicBoolean implements Subscription, CoreSubscriber<Object> {
final class SpanSubscriber<T> extends AtomicBoolean implements Subscription,
CoreSubscriber<T> {
private static final Logger log = Loggers.getLogger(SpanSubscriber.class);
private final Span span;
private final Span rootSpan;
private final Subscriber<? super Object> subscriber;
private final Subscriber<? super T> subscriber;
private final Context context;
private final Tracer tracer;
private Subscription s;
SpanSubscriber(Subscriber<? super Object> subscriber, Context ctx, Tracer tracer,
SpanSubscriber(Subscriber<? super T> subscriber, Context ctx, Tracer tracer,
String name) {
this.subscriber = subscriber;
this.tracer = tracer;
@@ -116,7 +117,7 @@ class SpanSubscriber extends AtomicBoolean implements Subscription, CoreSubscrib
}
}
@Override public void onNext(Object o) {
@Override public void onNext(T o) {
this.subscriber.onNext(o);
}

View File

@@ -44,15 +44,7 @@ public class TraceReactorAutoConfiguration {
@PostConstruct
public void setupHooks() {
Hooks.onNewSubscriber((pub, sub) -> {
//do not trace fused flows or simple just/error/empty
if(pub instanceof Fuseable && sub instanceof Fuseable.QueueSubscription
|| pub instanceof Fuseable.ScalarCallable){
return sub;
}
return new SpanSubscriber(sub, sub.currentContext(), this.tracer, pub
.toString());
});
Hooks.onLastOperator(ReactorSleuth.spanOperator(tracer));
Schedulers.setFactory(new Schedulers.Factory() {
@Override public ScheduledExecutorService decorateScheduledExecutorService(
String schedulerType,