Initial Reactor support

This commit is contained in:
Marcin Grzejszczak
2017-07-07 11:16:17 +02:00
parent fd6bfd6707
commit 3e7e54e09b
8 changed files with 469 additions and 0 deletions

View File

View File

@@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>spring-cloud-sleuth-reactor</artifactId>
<packaging>jar</packaging>
<name>Spring Cloud Sleuth Reactor</name>
<description>Spring Cloud Sleuth Reactor</description>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth</artifactId>
<version>1.3.0.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
<executions>
<execution>
<id>default-compile</id>
<configuration>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<compilerArguments>
<source>1.8</source>
<target>1.8</target>
</compilerArguments>
</configuration>
</execution>
<execution>
<id>default-testCompile</id>
<configuration>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
<compilerArguments>
<source>1.8</source>
<target>1.8</target>
</compilerArguments>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-sleuth-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,177 @@
package org.springframework.cloud.sleuth.reactor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;
import reactor.util.context.Contextualized;
/**
* A trace representation of the {@link Subscriber}
*
* @author Stephane Maldini
* @author Marcin Grzejszczak
* @since 1.3.0
*/
class SpanSubscriber extends AtomicBoolean
implements Subscriber<Object>, Subscription, Contextualized {
private static final Logger log = Loggers.getLogger(SpanSubscriber.class);
private final Span span;
private final Span rootSpan;
private final Subscriber<? super Object> subscriber;
private final Context context;
private final Tracer tracer;
private Subscription s;
SpanSubscriber(Subscriber<? super Object> subscriber, Context ctx, Tracer tracer,
String name) {
this.subscriber = subscriber;
this.tracer = tracer;
Span root = ctx.getOrDefault(Span.class, tracer.getCurrentSpan());
if (log.isTraceEnabled()) {
log.trace("Span from context [{}]", root);
}
this.rootSpan = root;
if (log.isTraceEnabled()) {
log.trace("Stored context root span [{}]", this.rootSpan);
}
this.span = tracer.createSpan(name, root);
if (log.isTraceEnabled()) {
log.trace("Created span [{}], with name [{}]", this.span, name);
}
this.context = ctx.put(Span.class, this.span);
}
@Override public void onSubscribe(Subscription subscription) {
if (log.isTraceEnabled()) {
log.trace("On subscribe");
}
this.s = subscription;
this.tracer.continueSpan(this.span);
if (log.isTraceEnabled()) {
log.trace("On subscribe - span continued");
}
this.subscriber.onSubscribe(this);
}
@Override public void request(long n) {
if (log.isTraceEnabled()) {
log.trace("Request");
}
this.tracer.continueSpan(this.span);
if (log.isTraceEnabled()) {
log.trace("Request - continued");
}
this.s.request(n);
// We're in the main thread so we don't want to pollute it with wrong spans
// that's why we need to detach the current one and continue with its parent
Span localRootSpan = this.span;
while (localRootSpan != null) {
if (this.rootSpan != null) {
if (localRootSpan.getSpanId() != this.rootSpan.getSpanId() &&
!isRootParentSpan(localRootSpan)) {
localRootSpan = continueDetachedSpan(localRootSpan);
} else {
localRootSpan = null;
}
} else if (!isRootParentSpan(localRootSpan)) {
localRootSpan = continueDetachedSpan(localRootSpan);
} else {
localRootSpan = null;
}
}
if (log.isTraceEnabled()) {
log.trace("Request after cleaning. Current span [{}]",
this.tracer.getCurrentSpan());
}
}
private boolean isRootParentSpan(Span localRootSpan) {
return localRootSpan.getSpanId() == localRootSpan.getTraceId();
}
private Span continueDetachedSpan(Span localRootSpan) {
if (log.isTraceEnabled()) {
log.trace("Will detach span {}", localRootSpan);
}
Span detachedSpan = this.tracer.detach(localRootSpan);
return this.tracer.continueSpan(detachedSpan);
}
@Override public void cancel() {
try {
if (log.isTraceEnabled()) {
log.trace("Cancel");
}
this.s.cancel();
}
finally {
cleanup();
}
}
@Override public void onNext(Object o) {
this.subscriber.onNext(o);
}
@Override public void onError(Throwable throwable) {
try {
this.subscriber.onError(throwable);
}
finally {
cleanup();
}
}
@Override public void onComplete() {
try {
this.subscriber.onComplete();
}
finally {
cleanup();
}
}
void cleanup() {
if (compareAndSet(false, true)) {
if (log.isTraceEnabled()) {
log.trace("Cleaning up");
}
if (this.tracer.getCurrentSpan() != this.span) {
if (log.isTraceEnabled()) {
log.trace("Detaching span");
}
this.tracer.detach(this.tracer.getCurrentSpan());
this.tracer.continueSpan(this.span);
if (log.isTraceEnabled()) {
log.trace("Continuing span");
}
}
if (log.isTraceEnabled()) {
log.trace("Closing span");
}
this.tracer.close(this.span);
if (log.isTraceEnabled()) {
log.trace("Span closed");
}
if (this.rootSpan != null) {
this.tracer.continueSpan(this.rootSpan);
this.tracer.close(this.rootSpan);
if (log.isTraceEnabled()) {
log.trace("Closed root span");
}
}
}
}
@Override public Context currentContext() {
return this.context;
}
}

View File

@@ -0,0 +1,60 @@
package org.springframework.cloud.sleuth.reactor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.cloud.sleuth.SpanNamer;
import org.springframework.cloud.sleuth.TraceKeys;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
import org.springframework.cloud.sleuth.instrument.async.TraceableScheduledExecutorService;
import org.springframework.context.annotation.Configuration;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.context.Context;
/**
* {@link org.springframework.boot.autoconfigure.EnableAutoConfiguration Auto-configuration}
* to enable tracing of Reactor components via Spring Cloud Sleuth.
*
* @author Stephane Maldini
* @author Marcin Grzejszczak
* @since 1.3.0
*/
@Configuration
@ConditionalOnProperty(value="spring.sleuth.reactor.enabled", matchIfMissing=true)
@ConditionalOnClass(Mono.class)
@AutoConfigureAfter(TraceAutoConfiguration.class)
public class TraceReactorAutoConfiguration {
@Configuration
@ConditionalOnBean(Tracer.class)
static class TraceReactorConfiguration {
@Autowired Tracer tracer;
@Autowired TraceKeys traceKeys;
@Autowired SpanNamer spanNamer;
@PostConstruct
public void setupHooks() {
Hooks.onNewSubscriber((pub, sub) ->
new SpanSubscriber(sub, Context.from(sub), this.tracer, pub.toString()));
Schedulers.setFactory(new Schedulers.Factory() {
@Override public ScheduledExecutorService decorateScheduledExecutorService(
String schedulerType,
Supplier<? extends ScheduledExecutorService> actual) {
return new TraceableScheduledExecutorService(actual.get(),
TraceReactorConfiguration.this.tracer,
TraceReactorConfiguration.this.traceKeys,
TraceReactorConfiguration.this.spanNamer);
}
});
}
}
}

View File

@@ -0,0 +1,3 @@
# Auto Configuration
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.sleuth.reactor.TraceReactorAutoConfiguration

View File

@@ -0,0 +1,119 @@
package org.springframework.cloud.sleuth.reactor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.sleuth.Sampler;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
import org.springframework.cloud.sleuth.util.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.junit4.SpringRunner;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import static org.assertj.core.api.BDDAssertions.then;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpanSubscriberTests.Config.class,
webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class SpanSubscriberTests {
private static final Log log = LogFactory.getLog(SpanSubscriberTests.class);
@Autowired Tracer tracer;
@Before
public void setup() {
ExceptionUtils.setFail(true);
}
@Test public void should_pass_tracing_info_when_using_reactor() {
Span span = this.tracer.createSpan("foo");
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
Publisher<Integer> traced = Flux.just(1, 2, 3);
log.info("Hello");
Flux.from(traced)
.map( d -> d + 1)
.map( d -> d + 1)
.map( (d) -> {
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
return d + 1;
})
.map( d -> d + 1)
.subscribe(System.out::println);
then(this.tracer.getCurrentSpan()).isNull();
then(spanInOperation.get().getParents().get(0)).isEqualTo(span.getSpanId());
then(ExceptionUtils.getLastException()).isNull();
}
@Test
public void should_pass_tracing_info_when_using_reactor_async() {
Span span = this.tracer.createSpan("foo");
final AtomicReference<Span> spanInOperation = new AtomicReference<>();
log.info("Hello");
Flux.just(1, 2, 3)
.publishOn(Schedulers.single())
.log("reactor.1")
.map( d -> d + 1)
.map( d -> d + 1)
.publishOn(Schedulers.newSingle("secondThread"))
.log("reactor.2")
.map( (d) -> {
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
return d + 1;
})
.map( d -> d + 1)
.blockLast();
Awaitility.await().untilAsserted(() -> {
then(spanInOperation.get().getTraceId()).isEqualTo(span.getTraceId());
then(ExceptionUtils.getLastException()).isNull();
});
then(this.tracer.getCurrentSpan()).isEqualTo(span);
this.tracer.close(span);
Span foo2 = this.tracer.createSpan("foo2");
Flux.just(1, 2, 3)
.publishOn(Schedulers.single())
.log("reactor.")
.map( d -> d + 1)
.map( d -> d + 1)
.map( (d) -> {
spanInOperation.set(SpanSubscriberTests.this.tracer.getCurrentSpan());
return d + 1;
})
.map( d -> d + 1)
.blockLast();
then(this.tracer.getCurrentSpan()).isEqualTo(foo2);
then(ExceptionUtils.getLastException()).isNull();
// parent cause there's an async span in the meantime
then(spanInOperation.get().getSavedSpan().getParents().get(0)).isEqualTo(foo2.getSpanId());
tracer.close(foo2);
}
@EnableAutoConfiguration
@Configuration
static class Config {
@Bean Sampler sampler() {
return new AlwaysSampler();
}
}
}

View File

@@ -0,0 +1 @@
logging.level.org.springframework.cloud.sleuth.reactor=TRACE