Removing Reactor support from Sleuth
Sleuth requires Reactor in version 3.x whereas from Boot we get 2.0.8. That's a gigantic difference.
This commit is contained in:
13
pom.xml
13
pom.xml
@@ -30,7 +30,6 @@
|
||||
<module>spring-cloud-sleuth-core</module>
|
||||
<module>spring-cloud-sleuth-zipkin</module>
|
||||
<module>spring-cloud-sleuth-stream</module>
|
||||
<module>spring-cloud-sleuth-reactor</module>
|
||||
<module>spring-cloud-sleuth-zipkin-stream</module>
|
||||
<module>spring-cloud-starter-sleuth</module>
|
||||
<module>spring-cloud-starter-zipkin</module>
|
||||
@@ -196,16 +195,6 @@
|
||||
<version>2.1</version>
|
||||
<!-- not test because we need it in stream -->
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>${reactor.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.reactivestreams</groupId>
|
||||
<artifactId>reactive-streams</artifactId>
|
||||
<version>${reactive-streams.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.hamcrest</groupId>
|
||||
<artifactId>hamcrest-core</artifactId>
|
||||
@@ -243,8 +232,6 @@
|
||||
<spring-cloud-commons.version>1.3.0.BUILD-SNAPSHOT</spring-cloud-commons.version>
|
||||
<spring-cloud-stream.version>Ditmars.BUILD-SNAPSHOT</spring-cloud-stream.version>
|
||||
<spring-cloud-netflix.version>1.4.0.BUILD-SNAPSHOT</spring-cloud-netflix.version>
|
||||
<reactor.version>3.1.0.BUILD-SNAPSHOT</reactor.version>
|
||||
<reactive-streams.version>1.0.0</reactive-streams.version>
|
||||
</properties>
|
||||
|
||||
<profiles>
|
||||
|
||||
@@ -24,11 +24,6 @@
|
||||
<artifactId>spring-cloud-sleuth-core</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-reactor</artifactId>
|
||||
<version>${project.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-zipkin</artifactId>
|
||||
|
||||
@@ -1,96 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<artifactId>spring-cloud-sleuth-reactor</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
<name>Spring Cloud Sleuth Reactor</name>
|
||||
<description>Spring Cloud Sleuth Reactor</description>
|
||||
|
||||
<parent>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth</artifactId>
|
||||
<version>1.3.0.BUILD-SNAPSHOT</version>
|
||||
<relativePath>..</relativePath>
|
||||
</parent>
|
||||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-compiler-plugin</artifactId>
|
||||
<configuration>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</configuration>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>default-compile</id>
|
||||
<configuration>
|
||||
<showDeprecation>true</showDeprecation>
|
||||
<showWarnings>true</showWarnings>
|
||||
<compilerArguments>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</compilerArguments>
|
||||
</configuration>
|
||||
</execution>
|
||||
<execution>
|
||||
<id>default-testCompile</id>
|
||||
<configuration>
|
||||
<showDeprecation>true</showDeprecation>
|
||||
<showWarnings>true</showWarnings>
|
||||
<compilerArguments>
|
||||
<source>1.8</source>
|
||||
<target>1.8</target>
|
||||
</compilerArguments>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
</plugins>
|
||||
</build>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-sleuth-core</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.reactivestreams</groupId>
|
||||
<artifactId>reactive-streams</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.aspectj</groupId>
|
||||
<artifactId>aspectjweaver</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.awaitility</groupId>
|
||||
<artifactId>awaitility</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -1,52 +0,0 @@
|
||||
package org.springframework.cloud.sleuth.instrument.reactor;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.cloud.sleuth.Tracer;
|
||||
|
||||
import reactor.core.Fuseable;
|
||||
import reactor.core.Scannable;
|
||||
import reactor.core.publisher.Operators;
|
||||
|
||||
/**
|
||||
* Reactive Span pointcuts factories
|
||||
*
|
||||
* @author Stephane Maldini
|
||||
* @since 1.3.0
|
||||
*/
|
||||
public abstract class ReactorSleuth {
|
||||
|
||||
/**
|
||||
* Return a span operator pointcut given a {@link Tracer}. This can be used in reactor
|
||||
* via {@link reactor.core.publisher.Flux#transform(Function)}, {@link
|
||||
* reactor.core.publisher.Mono#transform(Function)}, {@link
|
||||
* reactor.core.publisher.Hooks#onEachOperator(Function)} or {@link
|
||||
* reactor.core.publisher.Hooks#onLastOperator(Function)}.
|
||||
*
|
||||
* @param tracer the {@link Tracer} instance to use in this span operator
|
||||
* @param <T> an arbitrary type that is left unchanged by the span operator
|
||||
*
|
||||
* @return a new Span operator pointcut
|
||||
*/
|
||||
public static <T> Function<? super Publisher<T>, ? extends Publisher<T>> spanOperator(Tracer tracer) {
|
||||
return Operators.lift(POINTCUT_FILTER, ((scannable, sub) -> {
|
||||
//do not trace fused flows
|
||||
if(scannable instanceof Fuseable && sub instanceof Fuseable.QueueSubscription){
|
||||
return sub;
|
||||
}
|
||||
return new SpanSubscriber<>(
|
||||
sub,
|
||||
sub.currentContext(),
|
||||
tracer,
|
||||
scannable.name());
|
||||
}));
|
||||
}
|
||||
|
||||
private static final Predicate<Scannable> POINTCUT_FILTER =
|
||||
s -> !(s instanceof Fuseable.ScalarCallable);
|
||||
|
||||
private ReactorSleuth() {
|
||||
}
|
||||
}
|
||||
@@ -1,177 +0,0 @@
|
||||
package org.springframework.cloud.sleuth.instrument.reactor;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
import org.springframework.cloud.sleuth.Tracer;
|
||||
import reactor.core.CoreSubscriber;
|
||||
import reactor.util.Logger;
|
||||
import reactor.util.Loggers;
|
||||
import reactor.util.context.Context;
|
||||
|
||||
/**
|
||||
* A trace representation of the {@link Subscriber}
|
||||
*
|
||||
* @author Stephane Maldini
|
||||
* @author Marcin Grzejszczak
|
||||
* @since 1.3.0
|
||||
*/
|
||||
final class SpanSubscriber<T> extends AtomicBoolean implements Subscription,
|
||||
CoreSubscriber<T> {
|
||||
|
||||
private static final Logger log = Loggers.getLogger(SpanSubscriber.class);
|
||||
|
||||
private final Span span;
|
||||
private final Span rootSpan;
|
||||
private final Subscriber<? super T> subscriber;
|
||||
private final Context context;
|
||||
private final Tracer tracer;
|
||||
private Subscription s;
|
||||
|
||||
SpanSubscriber(Subscriber<? super T> subscriber, Context ctx, Tracer tracer,
|
||||
String name) {
|
||||
this.subscriber = subscriber;
|
||||
this.tracer = tracer;
|
||||
Span root = ctx.getOrDefault(Span.class, tracer.getCurrentSpan());
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Span from context [{}]", root);
|
||||
}
|
||||
this.rootSpan = root;
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Stored context root span [{}]", this.rootSpan);
|
||||
}
|
||||
this.span = tracer.createSpan(name, root);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Created span [{}], with name [{}]", this.span, name);
|
||||
}
|
||||
this.context = ctx.put(Span.class, this.span);
|
||||
}
|
||||
|
||||
@Override public void onSubscribe(Subscription subscription) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("On subscribe");
|
||||
}
|
||||
this.s = subscription;
|
||||
this.tracer.continueSpan(this.span);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("On subscribe - span continued");
|
||||
}
|
||||
this.subscriber.onSubscribe(this);
|
||||
}
|
||||
|
||||
@Override public void request(long n) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Request");
|
||||
}
|
||||
this.tracer.continueSpan(this.span);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Request - continued");
|
||||
}
|
||||
this.s.request(n);
|
||||
// We're in the main thread so we don't want to pollute it with wrong spans
|
||||
// that's why we need to detach the current one and continue with its parent
|
||||
Span localRootSpan = this.span;
|
||||
while (localRootSpan != null) {
|
||||
if (this.rootSpan != null) {
|
||||
if (localRootSpan.getSpanId() != this.rootSpan.getSpanId() &&
|
||||
!isRootParentSpan(localRootSpan)) {
|
||||
localRootSpan = continueDetachedSpan(localRootSpan);
|
||||
} else {
|
||||
localRootSpan = null;
|
||||
}
|
||||
} else if (!isRootParentSpan(localRootSpan)) {
|
||||
localRootSpan = continueDetachedSpan(localRootSpan);
|
||||
} else {
|
||||
localRootSpan = null;
|
||||
}
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Request after cleaning. Current span [{}]",
|
||||
this.tracer.getCurrentSpan());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isRootParentSpan(Span localRootSpan) {
|
||||
return localRootSpan.getSpanId() == localRootSpan.getTraceId();
|
||||
}
|
||||
|
||||
private Span continueDetachedSpan(Span localRootSpan) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Will detach span {}", localRootSpan);
|
||||
}
|
||||
Span detachedSpan = this.tracer.detach(localRootSpan);
|
||||
return this.tracer.continueSpan(detachedSpan);
|
||||
}
|
||||
|
||||
@Override public void cancel() {
|
||||
try {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Cancel");
|
||||
}
|
||||
this.s.cancel();
|
||||
}
|
||||
finally {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onNext(T o) {
|
||||
this.subscriber.onNext(o);
|
||||
}
|
||||
|
||||
@Override public void onError(Throwable throwable) {
|
||||
try {
|
||||
this.subscriber.onError(throwable);
|
||||
}
|
||||
finally {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onComplete() {
|
||||
try {
|
||||
this.subscriber.onComplete();
|
||||
}
|
||||
finally {
|
||||
cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
void cleanup() {
|
||||
if (compareAndSet(false, true)) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Cleaning up");
|
||||
}
|
||||
if (this.tracer.getCurrentSpan() != this.span) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Detaching span");
|
||||
}
|
||||
this.tracer.detach(this.tracer.getCurrentSpan());
|
||||
this.tracer.continueSpan(this.span);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Continuing span");
|
||||
}
|
||||
}
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Closing span");
|
||||
}
|
||||
this.tracer.close(this.span);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Span closed");
|
||||
}
|
||||
if (this.rootSpan != null) {
|
||||
this.tracer.continueSpan(this.rootSpan);
|
||||
this.tracer.close(this.rootSpan);
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Closed root span");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public Context currentContext() {
|
||||
return this.context;
|
||||
}
|
||||
}
|
||||
@@ -1,59 +0,0 @@
|
||||
package org.springframework.cloud.sleuth.instrument.reactor;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cloud.sleuth.SpanNamer;
|
||||
import org.springframework.cloud.sleuth.TraceKeys;
|
||||
import org.springframework.cloud.sleuth.Tracer;
|
||||
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
|
||||
import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import reactor.core.publisher.Hooks;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
/**
|
||||
* {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration}
|
||||
* to enable tracing of Reactor components via Spring Cloud Sleuth.
|
||||
*
|
||||
* @author Stephane Maldini
|
||||
* @author Marcin Grzejszczak
|
||||
* @since 1.3.0
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty(value="spring.sleuth.reactor.enabled", matchIfMissing=true)
|
||||
@ConditionalOnClass(Mono.class)
|
||||
@AutoConfigureAfter(TraceAutoConfiguration.class)
|
||||
public class TraceReactorAutoConfiguration {
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnBean(Tracer.class)
|
||||
static class TraceReactorConfiguration {
|
||||
@Autowired Tracer tracer;
|
||||
@Autowired TraceKeys traceKeys;
|
||||
@Autowired SpanNamer spanNamer;
|
||||
|
||||
@PostConstruct
|
||||
public void setupHooks() {
|
||||
Hooks.onLastOperator(ReactorSleuth.spanOperator(this.tracer));
|
||||
Schedulers.setFactory(new Schedulers.Factory() {
|
||||
@Override public ScheduledExecutorService decorateScheduledExecutorService(
|
||||
String schedulerType,
|
||||
Supplier<? extends ScheduledExecutorService> actual) {
|
||||
return new TraceableScheduledExecutorService(actual.get(),
|
||||
TraceReactorConfiguration.this.tracer,
|
||||
TraceReactorConfiguration.this.traceKeys,
|
||||
TraceReactorConfiguration.this.spanNamer);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,3 +0,0 @@
|
||||
# Auto Configuration
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.sleuth.instrument.reactor.TraceReactorAutoConfiguration
|
||||
@@ -1,188 +0,0 @@
|
||||
package org.springframework.cloud.sleuth.instrument.reactor;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.awaitility.Awaitility;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.reactivestreams.Publisher;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.sleuth.Sampler;
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
import org.springframework.cloud.sleuth.Tracer;
|
||||
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
|
||||
import org.springframework.cloud.sleuth.util.ExceptionUtils;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.test.context.junit4.SpringRunner;
|
||||
|
||||
import org.reactivestreams.Subscriber;
|
||||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.BaseSubscriber;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import static org.assertj.core.api.BDDAssertions.then;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = SpanSubscriberTests.Config.class,
|
||||
webEnvironment = SpringBootTest.WebEnvironment.NONE)
|
||||
public class SpanSubscriberTests {
|
||||
|
||||
private static final Log log = LogFactory.getLog(SpanSubscriberTests.class);
|
||||
|
||||
@Autowired Tracer tracer;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
ExceptionUtils.setFail(true);
|
||||
}
|
||||
|
||||
@Test public void should_pass_tracing_info_when_using_reactor() {
|
||||
Span span = this.tracer.createSpan("foo");
|
||||
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
|
||||
Publisher<Integer> traced = Flux.just(1, 2, 3);
|
||||
log.info("Hello");
|
||||
|
||||
Flux.from(traced)
|
||||
.map( d -> d + 1)
|
||||
.map( d -> d + 1)
|
||||
.map( (d) -> {
|
||||
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
|
||||
return d + 1;
|
||||
})
|
||||
.map( d -> d + 1)
|
||||
.subscribe(System.out::println);
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isNull();
|
||||
then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId());
|
||||
then(ExceptionUtils.getLastException()).isNull();
|
||||
}
|
||||
|
||||
@Test public void should_support_reactor_fusion_optimization() {
|
||||
Span span = this.tracer.createSpan("foo");
|
||||
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
|
||||
log.info("Hello");
|
||||
|
||||
Mono.just(1)
|
||||
.flatMap( d -> Flux.just(d + 1).collectList().map(p -> p.get(0)))
|
||||
.map( d -> d + 1)
|
||||
.map( (d) -> {
|
||||
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
|
||||
return d + 1;
|
||||
})
|
||||
.map( d -> d + 1)
|
||||
.subscribe(System.out::println);
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isNull();
|
||||
then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId());
|
||||
then(ExceptionUtils.getLastException()).isNull();
|
||||
}
|
||||
|
||||
@Test public void should_not_trace_scalar_flows() {
|
||||
this.tracer.createSpan("foo");
|
||||
final AtomicReference<Subscription> spanInOperation = new AtomicReference<>();
|
||||
log.info("Hello");
|
||||
|
||||
Mono.just(1)
|
||||
.subscribe(new BaseSubscriber<Integer>() {
|
||||
@Override
|
||||
protected void hookOnSubscribe(Subscription subscription) {
|
||||
spanInOperation.set(subscription);
|
||||
}
|
||||
});
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isNotNull();
|
||||
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
|
||||
|
||||
Mono.<Integer>error(new Exception())
|
||||
.subscribe(new BaseSubscriber<Integer>() {
|
||||
@Override
|
||||
protected void hookOnSubscribe(Subscription subscription) {
|
||||
spanInOperation.set(subscription);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void hookOnError(Throwable throwable) {
|
||||
}
|
||||
});
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isNotNull();
|
||||
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
|
||||
|
||||
Mono.<Integer>empty()
|
||||
.subscribe(new BaseSubscriber<Integer>() {
|
||||
@Override
|
||||
protected void hookOnSubscribe(Subscription subscription) {
|
||||
spanInOperation.set(subscription);
|
||||
}
|
||||
});
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isNotNull();
|
||||
then(spanInOperation.get()).isNotInstanceOf(SpanSubscriber.class);
|
||||
then(ExceptionUtils.getLastException()).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void should_pass_tracing_info_when_using_reactor_async() {
|
||||
|
||||
Span span = this.tracer.createSpan("foo");
|
||||
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
|
||||
log.info("Hello");
|
||||
|
||||
Flux.just(1, 2, 3)
|
||||
.publishOn(Schedulers.single())
|
||||
.log("reactor.1")
|
||||
.map( d -> d + 1)
|
||||
.map( d -> d + 1)
|
||||
.publishOn(Schedulers.newSingle("secondThread"))
|
||||
.log("reactor.2")
|
||||
.map( (d) -> {
|
||||
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
|
||||
return d + 1;
|
||||
})
|
||||
.map( d -> d + 1)
|
||||
.blockLast();
|
||||
|
||||
Awaitility.await().untilAsserted(() -> {
|
||||
then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId());
|
||||
then(ExceptionUtils.getLastException()).isNull();
|
||||
});
|
||||
then(this.tracer.getCurrentSpan()).isEqualTo(span);
|
||||
this.tracer.close(span);
|
||||
|
||||
Span foo2 = this.tracer.createSpan("foo2");
|
||||
|
||||
Flux.just(1, 2, 3)
|
||||
.publishOn(Schedulers.single())
|
||||
.log("reactor.")
|
||||
.map( d -> d + 1)
|
||||
.map( d -> d + 1)
|
||||
.map( (d) -> {
|
||||
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
|
||||
return d + 1;
|
||||
})
|
||||
.map( d -> d + 1)
|
||||
.blockLast();
|
||||
|
||||
then(this.tracer.getCurrentSpan()).isEqualTo(foo2);
|
||||
then(ExceptionUtils.getLastException()).isNull();
|
||||
// parent cause there's an async span in the meantime
|
||||
then(spanInOperation.get().getSavedSpan().getParents().get(0)).isEqualTo(foo2.getSpanId());
|
||||
tracer.close(foo2);
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
static class Config {
|
||||
@Bean Sampler sampler() {
|
||||
return new AlwaysSampler();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
logging.level.org.springframework.cloud.sleuth.instrument.reactor=TRACE
|
||||
Reference in New Issue
Block a user