diff --git a/pom.xml b/pom.xml index 7d71fc8d2..9994ccf10 100644 --- a/pom.xml +++ b/pom.xml @@ -30,7 +30,6 @@ spring-cloud-sleuth-core spring-cloud-sleuth-zipkin spring-cloud-sleuth-stream - spring-cloud-sleuth-reactor spring-cloud-sleuth-zipkin-stream spring-cloud-starter-sleuth spring-cloud-starter-zipkin @@ -196,16 +195,6 @@ 2.1 - - io.projectreactor - reactor-core - ${reactor.version} - - - org.reactivestreams - reactive-streams - ${reactive-streams.version} - org.hamcrest hamcrest-core @@ -243,8 +232,6 @@ 1.3.0.BUILD-SNAPSHOT Ditmars.BUILD-SNAPSHOT 1.4.0.BUILD-SNAPSHOT - 3.1.0.BUILD-SNAPSHOT - 1.0.0 diff --git a/spring-cloud-sleuth-dependencies/pom.xml b/spring-cloud-sleuth-dependencies/pom.xml index 84483e180..ffba32795 100644 --- a/spring-cloud-sleuth-dependencies/pom.xml +++ b/spring-cloud-sleuth-dependencies/pom.xml @@ -24,11 +24,6 @@ spring-cloud-sleuth-core ${project.version} - - org.springframework.cloud - spring-cloud-sleuth-reactor - ${project.version} - org.springframework.cloud spring-cloud-sleuth-zipkin diff --git a/spring-cloud-sleuth-reactor/.jdk8 b/spring-cloud-sleuth-reactor/.jdk8 deleted file mode 100644 index e69de29bb..000000000 diff --git a/spring-cloud-sleuth-reactor/pom.xml b/spring-cloud-sleuth-reactor/pom.xml deleted file mode 100644 index f443d879e..000000000 --- a/spring-cloud-sleuth-reactor/pom.xml +++ /dev/null @@ -1,96 +0,0 @@ - - - 4.0.0 - - spring-cloud-sleuth-reactor - jar - Spring Cloud Sleuth Reactor - Spring Cloud Sleuth Reactor - - - org.springframework.cloud - spring-cloud-sleuth - 1.3.0.BUILD-SNAPSHOT - .. - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - - - default-compile - - true - true - - 1.8 - 1.8 - - - - - default-testCompile - - true - true - - 1.8 - 1.8 - - - - - - - - - - - org.springframework.cloud - spring-cloud-sleuth-core - - - org.springframework.boot - spring-boot-starter - - - io.projectreactor - reactor-core - true - - - org.reactivestreams - reactive-streams - true - - - org.aspectj - aspectjweaver - test - - - org.assertj - assertj-core - test - - - org.awaitility - awaitility - test - - - org.springframework.boot - spring-boot-starter-test - test - - - - diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java deleted file mode 100644 index d8c6b244a..000000000 --- a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/ReactorSleuth.java +++ /dev/null @@ -1,52 +0,0 @@ -package org.springframework.cloud.sleuth.instrument.reactor; - -import java.util.function.Function; -import java.util.function.Predicate; - -import org.reactivestreams.Publisher; -import org.springframework.cloud.sleuth.Tracer; - -import reactor.core.Fuseable; -import reactor.core.Scannable; -import reactor.core.publisher.Operators; - -/** - * Reactive Span pointcuts factories - * - * @author Stephane Maldini - * @since 1.3.0 - */ -public abstract class ReactorSleuth { - - /** - * Return a span operator pointcut given a {@link Tracer}. This can be used in reactor - * via {@link reactor.core.publisher.Flux#transform(Function)}, {@link - * reactor.core.publisher.Mono#transform(Function)}, {@link - * reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link - * reactor.core.publisher.Hooks#onLastOperator(Function)}. - * - * @param tracer the {@link Tracer} instance to use in this span operator - * @param an arbitrary type that is left unchanged by the span operator - * - * @return a new Span operator pointcut - */ - public static Function, ? extends Publisher> spanOperator(Tracer tracer) { - return Operators.lift(POINTCUT_FILTER, ((scannable, sub) -> { - //do not trace fused flows - if(scannable instanceof Fuseable && sub instanceof Fuseable.QueueSubscription){ - return sub; - } - return new SpanSubscriber<>( - sub, - sub.currentContext(), - tracer, - scannable.name()); - })); - } - - private static final Predicate POINTCUT_FILTER = - s -> !(s instanceof Fuseable.ScalarCallable); - - private ReactorSleuth() { - } -} diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java deleted file mode 100644 index f401c919f..000000000 --- a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriber.java +++ /dev/null @@ -1,177 +0,0 @@ -package org.springframework.cloud.sleuth.instrument.reactor; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.springframework.cloud.sleuth.Span; -import org.springframework.cloud.sleuth.Tracer; -import reactor.core.CoreSubscriber; -import reactor.util.Logger; -import reactor.util.Loggers; -import reactor.util.context.Context; - -/** - * A trace representation of the {@link Subscriber} - * - * @author Stephane Maldini - * @author Marcin Grzejszczak - * @since 1.3.0 - */ -final class SpanSubscriber extends AtomicBoolean implements Subscription, - CoreSubscriber { - - private static final Logger log = Loggers.getLogger(SpanSubscriber.class); - - private final Span span; - private final Span rootSpan; - private final Subscriber subscriber; - private final Context context; - private final Tracer tracer; - private Subscription s; - - SpanSubscriber(Subscriber subscriber, Context ctx, Tracer tracer, - String name) { - this.subscriber = subscriber; - this.tracer = tracer; - Span root = ctx.getOrDefault(Span.class, tracer.getCurrentSpan()); - if (log.isTraceEnabled()) { - log.trace("Span from context [{}]", root); - } - this.rootSpan = root; - if (log.isTraceEnabled()) { - log.trace("Stored context root span [{}]", this.rootSpan); - } - this.span = tracer.createSpan(name, root); - if (log.isTraceEnabled()) { - log.trace("Created span [{}], with name [{}]", this.span, name); - } - this.context = ctx.put(Span.class, this.span); - } - - @Override public void onSubscribe(Subscription subscription) { - if (log.isTraceEnabled()) { - log.trace("On subscribe"); - } - this.s = subscription; - this.tracer.continueSpan(this.span); - if (log.isTraceEnabled()) { - log.trace("On subscribe - span continued"); - } - this.subscriber.onSubscribe(this); - } - - @Override public void request(long n) { - if (log.isTraceEnabled()) { - log.trace("Request"); - } - this.tracer.continueSpan(this.span); - if (log.isTraceEnabled()) { - log.trace("Request - continued"); - } - this.s.request(n); - // We're in the main thread so we don't want to pollute it with wrong spans - // that's why we need to detach the current one and continue with its parent - Span localRootSpan = this.span; - while (localRootSpan != null) { - if (this.rootSpan != null) { - if (localRootSpan.getSpanId() != this.rootSpan.getSpanId() && - !isRootParentSpan(localRootSpan)) { - localRootSpan = continueDetachedSpan(localRootSpan); - } else { - localRootSpan = null; - } - } else if (!isRootParentSpan(localRootSpan)) { - localRootSpan = continueDetachedSpan(localRootSpan); - } else { - localRootSpan = null; - } - } - if (log.isTraceEnabled()) { - log.trace("Request after cleaning. Current span [{}]", - this.tracer.getCurrentSpan()); - } - } - - private boolean isRootParentSpan(Span localRootSpan) { - return localRootSpan.getSpanId() == localRootSpan.getTraceId(); - } - - private Span continueDetachedSpan(Span localRootSpan) { - if (log.isTraceEnabled()) { - log.trace("Will detach span {}", localRootSpan); - } - Span detachedSpan = this.tracer.detach(localRootSpan); - return this.tracer.continueSpan(detachedSpan); - } - - @Override public void cancel() { - try { - if (log.isTraceEnabled()) { - log.trace("Cancel"); - } - this.s.cancel(); - } - finally { - cleanup(); - } - } - - @Override public void onNext(T o) { - this.subscriber.onNext(o); - } - - @Override public void onError(Throwable throwable) { - try { - this.subscriber.onError(throwable); - } - finally { - cleanup(); - } - } - - @Override public void onComplete() { - try { - this.subscriber.onComplete(); - } - finally { - cleanup(); - } - } - - void cleanup() { - if (compareAndSet(false, true)) { - if (log.isTraceEnabled()) { - log.trace("Cleaning up"); - } - if (this.tracer.getCurrentSpan() != this.span) { - if (log.isTraceEnabled()) { - log.trace("Detaching span"); - } - this.tracer.detach(this.tracer.getCurrentSpan()); - this.tracer.continueSpan(this.span); - if (log.isTraceEnabled()) { - log.trace("Continuing span"); - } - } - if (log.isTraceEnabled()) { - log.trace("Closing span"); - } - this.tracer.close(this.span); - if (log.isTraceEnabled()) { - log.trace("Span closed"); - } - if (this.rootSpan != null) { - this.tracer.continueSpan(this.rootSpan); - this.tracer.close(this.rootSpan); - if (log.isTraceEnabled()) { - log.trace("Closed root span"); - } - } - } - } - - @Override public Context currentContext() { - return this.context; - } -} \ No newline at end of file diff --git a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java b/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java deleted file mode 100644 index f86cc2b71..000000000 --- a/spring-cloud-sleuth-reactor/src/main/java/org/springframework/cloud/sleuth/instrument/reactor/TraceReactorAutoConfiguration.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.springframework.cloud.sleuth.instrument.reactor; - -import javax.annotation.PostConstruct; -import java.util.concurrent.ScheduledExecutorService; -import java.util.function.Supplier; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.AutoConfigureAfter; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.cloud.sleuth.SpanNamer; -import org.springframework.cloud.sleuth.TraceKeys; -import org.springframework.cloud.sleuth.Tracer; -import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration; -import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService; -import org.springframework.context.annotation.Configuration; - -import reactor.core.publisher.Hooks; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -/** - * {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration} - * to enable tracing of Reactor components via Spring Cloud Sleuth. - * - * @author Stephane Maldini - * @author Marcin Grzejszczak - * @since 1.3.0 - */ -@Configuration -@ConditionalOnProperty(value="spring.sleuth.reactor.enabled", matchIfMissing=true) -@ConditionalOnClass(Mono.class) -@AutoConfigureAfter(TraceAutoConfiguration.class) -public class TraceReactorAutoConfiguration { - - @Configuration - @ConditionalOnBean(Tracer.class) - static class TraceReactorConfiguration { - @Autowired Tracer tracer; - @Autowired TraceKeys traceKeys; - @Autowired SpanNamer spanNamer; - - @PostConstruct - public void setupHooks() { - Hooks.onLastOperator(ReactorSleuth.spanOperator(this.tracer)); - Schedulers.setFactory(new Schedulers.Factory() { - @Override public ScheduledExecutorService decorateScheduledExecutorService( - String schedulerType, - Supplier actual) { - return new TraceableScheduledExecutorService(actual.get(), - TraceReactorConfiguration.this.tracer, - TraceReactorConfiguration.this.traceKeys, - TraceReactorConfiguration.this.spanNamer); - } - }); - } - } -} diff --git a/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories deleted file mode 100644 index 3938ffd61..000000000 --- a/spring-cloud-sleuth-reactor/src/main/resources/META-INF/spring.factories +++ /dev/null @@ -1,3 +0,0 @@ -# Auto Configuration -org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.cloud.sleuth.instrument.reactor.TraceReactorAutoConfiguration diff --git a/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java b/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java deleted file mode 100644 index 8ea6e2901..000000000 --- a/spring-cloud-sleuth-reactor/src/test/java/org/springframework/cloud/sleuth/instrument/reactor/SpanSubscriberTests.java +++ /dev/null @@ -1,188 +0,0 @@ -package org.springframework.cloud.sleuth.instrument.reactor; - -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.awaitility.Awaitility; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.reactivestreams.Publisher; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.cloud.sleuth.Sampler; -import org.springframework.cloud.sleuth.Span; -import org.springframework.cloud.sleuth.Tracer; -import org.springframework.cloud.sleuth.sampler.AlwaysSampler; -import org.springframework.cloud.sleuth.util.ExceptionUtils; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.test.context.junit4.SpringRunner; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.core.publisher.BaseSubscriber; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; - -import static org.assertj.core.api.BDDAssertions.then; - -@RunWith(SpringRunner.class) -@SpringBootTest(classes = SpanSubscriberTests.Config.class, - webEnvironment = SpringBootTest.WebEnvironment.NONE) -public class SpanSubscriberTests { - - private static final Log log = LogFactory.getLog(SpanSubscriberTests.class); - - @Autowired Tracer tracer; - - @Before - public void setup() { - ExceptionUtils.setFail(true); - } - - @Test public void should_pass_tracing_info_when_using_reactor() { - Span span = this.tracer.createSpan("foo"); - final AtomicReference spanInOperation = new AtomicReference<>(); - Publisher traced = Flux.just(1, 2, 3); - log.info("Hello"); - - Flux.from(traced) - .map( d -> d + 1) - .map( d -> d + 1) - .map( (d) -> { - spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); - return d + 1; - }) - .map( d -> d + 1) - .subscribe(System.out::println); - - then(this.tracer.getCurrentSpan()).isNull(); - then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId()); - then(ExceptionUtils.getLastException()).isNull(); - } - - @Test public void should_support_reactor_fusion_optimization() { - Span span = this.tracer.createSpan("foo"); - final AtomicReference spanInOperation = new AtomicReference<>(); - log.info("Hello"); - - Mono.just(1) - .flatMap( d -> Flux.just(d + 1).collectList().map(p -> p.get(0))) - .map( d -> d + 1) - .map( (d) -> { - spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); - return d + 1; - }) - .map( d -> d + 1) - .subscribe(System.out::println); - - then(this.tracer.getCurrentSpan()).isNull(); - then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId()); - then(ExceptionUtils.getLastException()).isNull(); - } - - @Test public void should_not_trace_scalar_flows() { - this.tracer.createSpan("foo"); - final AtomicReference spanInOperation = new AtomicReference<>(); - log.info("Hello"); - - Mono.just(1) - .subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - }); - - then(this.tracer.getCurrentSpan()).isNotNull(); - then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class); - - Mono.error(new Exception()) - .subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - - @Override - protected void hookOnError(Throwable throwable) { - } - }); - - then(this.tracer.getCurrentSpan()).isNotNull(); - then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class); - - Mono.empty() - .subscribe(new BaseSubscriber() { - @Override - protected void hookOnSubscribe(Subscription subscription) { - spanInOperation.set(subscription); - } - }); - - then(this.tracer.getCurrentSpan()).isNotNull(); - then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class); - then(ExceptionUtils.getLastException()).isNull(); - } - - @Test - public void should_pass_tracing_info_when_using_reactor_async() { - - Span span = this.tracer.createSpan("foo"); - final AtomicReference spanInOperation = new AtomicReference<>(); - log.info("Hello"); - - Flux.just(1, 2, 3) - .publishOn(Schedulers.single()) - .log("reactor.1") - .map( d -> d + 1) - .map( d -> d + 1) - .publishOn(Schedulers.newSingle("secondThread")) - .log("reactor.2") - .map( (d) -> { - spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); - return d + 1; - }) - .map( d -> d + 1) - .blockLast(); - - Awaitility.await().untilAsserted(() -> { - then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId()); - then(ExceptionUtils.getLastException()).isNull(); - }); - then(this.tracer.getCurrentSpan()).isEqualTo(span); - this.tracer.close(span); - - Span foo2 = this.tracer.createSpan("foo2"); - - Flux.just(1, 2, 3) - .publishOn(Schedulers.single()) - .log("reactor.") - .map( d -> d + 1) - .map( d -> d + 1) - .map( (d) -> { - spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan()); - return d + 1; - }) - .map( d -> d + 1) - .blockLast(); - - then(this.tracer.getCurrentSpan()).isEqualTo(foo2); - then(ExceptionUtils.getLastException()).isNull(); - // parent cause there's an async span in the meantime - then(spanInOperation.get().getSavedSpan().getParents().get(0)).isEqualTo(foo2.getSpanId()); - tracer.close(foo2); - } - - @EnableAutoConfiguration - @Configuration - static class Config { - @Bean Sampler sampler() { - return new AlwaysSampler(); - } - } -} \ No newline at end of file diff --git a/spring-cloud-sleuth-reactor/src/test/resources/application.properties b/spring-cloud-sleuth-reactor/src/test/resources/application.properties deleted file mode 100644 index 8edfd35f9..000000000 --- a/spring-cloud-sleuth-reactor/src/test/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -logging.level.org.springframework.cloud.sleuth.instrument.reactor=TRACE \ No newline at end of file