Fixes missing zipkin service name and polishes span converters.
Missing service name: Zipkin service names were logged as null, which is invalid and led to them showing up as "unknown" in the zipkin ui. This was due to a wiring bug, and a special-case, which this change fixes. The special-case was when a sleuth span had no annotations. Since zipkin service names are attached to annotations, they are only queryable when annotations exist. When there are no annotations, we add the "lc" binary annotation, which makes that span attached to the correct service in zipkin. Polishing: Zipkin timestamps were not always set as microseconds. This fixes that. The de-facto label in zipkin for unknown service is "unknown". This fixes the code, which formerly fell back to "application". This also removes complexity in assigning timestamp and duration as we no longer need to make pseudo-annotations "acquire" and "release". Finally, this adds tests about above consistently to both scs-zipkin and scs-zipkin-stream.
This commit is contained in:
@@ -50,6 +50,12 @@
|
||||
<!-- Only needed at compile time -->
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>2.1.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.sleuth.stream;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
@@ -28,15 +27,14 @@ import org.springframework.context.event.EventListener;
|
||||
*/
|
||||
public class ServerPropertiesHostLocator implements HostLocator {
|
||||
|
||||
@Value("${spring.application.name:application}")
|
||||
private String appName;
|
||||
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
private final ServerProperties serverProperties;
|
||||
private final String appName;
|
||||
private Integer port;
|
||||
|
||||
public ServerPropertiesHostLocator(ServerProperties serverProperties) {
|
||||
public ServerPropertiesHostLocator(ServerProperties serverProperties,
|
||||
String appName) {
|
||||
this.serverProperties = serverProperties;
|
||||
this.appName = appName;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -80,7 +78,7 @@ public class ServerPropertiesHostLocator implements HostLocator {
|
||||
|
||||
private String getServiceName(Span span) {
|
||||
String serviceName;
|
||||
if (span.getProcessId() != null) {
|
||||
if (span.getProcessId() != null) { // TODO: javadocs say this isn't nullable!
|
||||
serviceName = span.getProcessId().toLowerCase();
|
||||
}
|
||||
else {
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.sleuth.stream;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
@@ -77,9 +78,12 @@ public class SleuthStreamAutoConfiguration {
|
||||
@Autowired(required = false)
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
@Value("${spring.application.name:unknown}")
|
||||
private String appName;
|
||||
|
||||
@Bean
|
||||
public HostLocator zipkinEndpointLocator() {
|
||||
return new ServerPropertiesHostLocator(this.serverProperties);
|
||||
return new ServerPropertiesHostLocator(this.serverProperties, this.appName);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -91,6 +95,9 @@ public class SleuthStreamAutoConfiguration {
|
||||
@Autowired(required = false)
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
@Value("${spring.application.name:unknown}")
|
||||
private String appName;
|
||||
|
||||
@Autowired(required = false)
|
||||
private DiscoveryClient client;
|
||||
|
||||
@@ -99,7 +106,7 @@ public class SleuthStreamAutoConfiguration {
|
||||
if (this.client != null) {
|
||||
return new DiscoveryClientHostLocator(this.client);
|
||||
}
|
||||
return new ServerPropertiesHostLocator(this.serverProperties);
|
||||
return new ServerPropertiesHostLocator(this.serverProperties, this.appName);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.sleuth.stream;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.Collections;
|
||||
import org.junit.Test;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.cloud.sleuth.MilliSpan;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class ServerPropertiesHostLocatorTests {
|
||||
MilliSpan span = new MilliSpan(1, 3, "name", "traceId", Collections.<String>emptyList(), "spanId", true, true, "processId");
|
||||
|
||||
@Test
|
||||
public void portDefaultsTo8080() {
|
||||
ServerPropertiesHostLocator locator = new ServerPropertiesHostLocator(new ServerProperties(), "unknown");
|
||||
|
||||
assertThat(locator.locate(span).getPort()).isEqualTo((short) 8080);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void portFromServerProperties() {
|
||||
ServerProperties properties = new ServerProperties();
|
||||
properties.setPort(1234);
|
||||
|
||||
ServerPropertiesHostLocator locator = new ServerPropertiesHostLocator(properties, "unknown");
|
||||
|
||||
assertThat(locator.locate(span).getPort()).isEqualTo((short) 1234);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void portDefaultsToLocalhost() {
|
||||
ServerPropertiesHostLocator locator = new ServerPropertiesHostLocator(new ServerProperties(), "unknown");
|
||||
|
||||
assertThat(locator.locate(span).getAddress())
|
||||
.isEqualTo("127.0.0.1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hostFromServerPropertiesIp() throws UnknownHostException {
|
||||
ServerProperties properties = new ServerProperties();
|
||||
properties.setAddress(InetAddress.getByAddress(new byte[]{1, 2, 3, 4}));
|
||||
|
||||
ServerPropertiesHostLocator locator = new ServerPropertiesHostLocator(properties, "unknown");
|
||||
|
||||
assertThat(locator.locate(span).getAddress())
|
||||
.isEqualTo("1.2.3.4");
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,12 @@
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>2.1.0</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-stream-test-support</artifactId>
|
||||
|
||||
@@ -37,6 +37,7 @@ import org.springframework.util.StringUtils;
|
||||
import io.zipkin.Annotation;
|
||||
import io.zipkin.BinaryAnnotation;
|
||||
import io.zipkin.BinaryAnnotation.Type;
|
||||
import io.zipkin.Constants;
|
||||
import io.zipkin.Endpoint;
|
||||
import io.zipkin.Span.Builder;
|
||||
import io.zipkin.SpanStore;
|
||||
@@ -74,14 +75,29 @@ public class ZipkinMessageListener {
|
||||
* <li>Create binary annotations based on data from Span object.
|
||||
* </ul>
|
||||
*/
|
||||
public io.zipkin.Span convert(Span span, Host host) {
|
||||
// VisibleForTesting
|
||||
static io.zipkin.Span convert(Span span, Host host) {
|
||||
Builder zipkinSpan = new io.zipkin.Span.Builder();
|
||||
|
||||
Endpoint ep = Endpoint.create(host.getServiceName(), host.getIpv4(),
|
||||
host.getPort().shortValue());
|
||||
List<Annotation> annotationList = createZipkinAnnotations(span, ep);
|
||||
List<BinaryAnnotation> binaryAnnotationList = createZipkinBinaryAnnotations(span,
|
||||
ep);
|
||||
|
||||
// A zipkin span without any annotations cannot be queried, add special "lc" to avoid that.
|
||||
if (span.getTimelineAnnotations().isEmpty() && span.getAnnotations().isEmpty()) {
|
||||
// TODO: javadocs say this isn't nullable!
|
||||
String processId = span.getProcessId() != null
|
||||
? span.getProcessId().toLowerCase()
|
||||
: "unknown";
|
||||
zipkinSpan.addBinaryAnnotation(
|
||||
BinaryAnnotation.create(Constants.LOCAL_COMPONENT, processId, ep)
|
||||
);
|
||||
} else {
|
||||
addZipkinAnnotations(zipkinSpan, span, ep);
|
||||
addZipkinBinaryAnnotations(zipkinSpan, span, ep);
|
||||
}
|
||||
|
||||
zipkinSpan.timestamp(span.getBegin() * 1000);
|
||||
zipkinSpan.duration((span.getEnd() - span.getBegin()) * 1000);
|
||||
zipkinSpan.traceId(hash(span.getTraceId()));
|
||||
if (span.getParents().size() > 0) {
|
||||
if (span.getParents().size() > 1) {
|
||||
@@ -94,26 +110,21 @@ public class ZipkinMessageListener {
|
||||
if (StringUtils.hasText(span.getName())) {
|
||||
zipkinSpan.name(span.getName());
|
||||
}
|
||||
for (Annotation annotation : annotationList) {
|
||||
zipkinSpan.addAnnotation(annotation);
|
||||
}
|
||||
for (BinaryAnnotation annotation : binaryAnnotationList) {
|
||||
zipkinSpan.addBinaryAnnotation(annotation);
|
||||
}
|
||||
return zipkinSpan.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add annotations from the sleuth Span.
|
||||
*/
|
||||
private List<Annotation> createZipkinAnnotations(Span span, Endpoint endpoint) {
|
||||
List<Annotation> annotationList = new ArrayList<>();
|
||||
private static void addZipkinAnnotations(Builder zipkinSpan, Span span, Endpoint endpoint) {
|
||||
for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
|
||||
Annotation zipkinAnnotation = createZipkinAnnotation(ta.getMsg(),
|
||||
ta.getTime(), endpoint, true);
|
||||
annotationList.add(zipkinAnnotation);
|
||||
Annotation zipkinAnnotation = new Annotation.Builder()
|
||||
.endpoint(endpoint)
|
||||
.timestamp(ta.getTime() * 1000) // Zipkin is in microseconds
|
||||
.value(ta.getMsg())
|
||||
.build();
|
||||
zipkinSpan.addAnnotation(zipkinAnnotation);
|
||||
}
|
||||
return annotationList;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -121,9 +132,8 @@ public class ZipkinMessageListener {
|
||||
*
|
||||
* @return list of Annotations that could be added to Zipkin Span.
|
||||
*/
|
||||
private List<BinaryAnnotation> createZipkinBinaryAnnotations(Span span,
|
||||
private static void addZipkinBinaryAnnotations(Builder zipkinSpan, Span span,
|
||||
Endpoint endpoint) {
|
||||
List<BinaryAnnotation> l = new ArrayList<>();
|
||||
for (Map.Entry<String, String> e : span.getAnnotations().entrySet()) {
|
||||
BinaryAnnotation.Builder binaryAnn = new BinaryAnnotation.Builder();
|
||||
binaryAnn.type(Type.STRING);
|
||||
@@ -135,33 +145,8 @@ public class ZipkinMessageListener {
|
||||
log.error("Error encoding string as UTF-8", ex);
|
||||
}
|
||||
binaryAnn.endpoint(endpoint);
|
||||
l.add(binaryAnn.build());
|
||||
zipkinSpan.addBinaryAnnotation(binaryAnn.build());
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an annotation with the correct times and endpoint.
|
||||
*
|
||||
* @param value Annotation value
|
||||
* @param time timestamp will be extracted
|
||||
* @param endpoint the endpoint this annotation will be associated with.
|
||||
* @param sendRequest use the first or last timestamp.
|
||||
*/
|
||||
private static Annotation createZipkinAnnotation(String value, long time,
|
||||
Endpoint endpoint, boolean sendRequest) {
|
||||
Annotation.Builder annotation = new Annotation.Builder();
|
||||
annotation.endpoint(endpoint);
|
||||
|
||||
// Zipkin is in microseconds
|
||||
if (sendRequest) {
|
||||
annotation.timestamp(time * 1000);
|
||||
}
|
||||
else {
|
||||
annotation.timestamp(time * 1000);
|
||||
}
|
||||
annotation.value(value);
|
||||
return annotation.build();
|
||||
}
|
||||
|
||||
private static long hash(String string) {
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
/*
|
||||
* Copyright 2013-2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin.stream;
|
||||
|
||||
import io.zipkin.BinaryAnnotation;
|
||||
import io.zipkin.Endpoint;
|
||||
import java.util.Collections;
|
||||
import org.junit.Test;
|
||||
import org.springframework.cloud.sleuth.MilliSpan;
|
||||
import org.springframework.cloud.sleuth.stream.Host;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class ZipkinMessageListenerTests {
|
||||
MilliSpan span = new MilliSpan(1, 3, "name", "traceId", Collections.<String>emptyList(), "spanId", true, true, "processId");
|
||||
Host host = new Host("myservice", "1.2.3.4", 8080);
|
||||
Endpoint endpoint = Endpoint.create("myservice", 1 << 24 | 2 << 16 | 3 << 8 | 4, 8080);
|
||||
|
||||
/** Sleuth timestamps are millisecond granularity while zipkin is microsecond. */
|
||||
@Test
|
||||
public void convertsTimestampAndDurationToMicroseconds() {
|
||||
long start = System.currentTimeMillis();
|
||||
span.addTimelineAnnotation("http/request/retry"); // System.currentTimeMillis
|
||||
|
||||
io.zipkin.Span result = ZipkinMessageListener.convert(span, host);
|
||||
|
||||
assertThat(result.timestamp)
|
||||
.isEqualTo(span.getBegin() * 1000);
|
||||
assertThat(result.duration)
|
||||
.isEqualTo((span.getEnd() - span.getBegin()) * 1000);
|
||||
assertThat(result.annotations.get(0).timestamp)
|
||||
.isGreaterThanOrEqualTo(start * 1000)
|
||||
.isLessThanOrEqualTo(System.currentTimeMillis() * 1000);
|
||||
}
|
||||
|
||||
/** Sleuth host corresponds to annotation/binaryAnnotation.host in zipkin. */
|
||||
@Test
|
||||
public void annotationsIncludeHost() {
|
||||
span.addTimelineAnnotation("http/request/retry");
|
||||
span.addAnnotation("spring-boot/version", "1.3.1.RELEASE");
|
||||
|
||||
io.zipkin.Span result = ZipkinMessageListener.convert(span, host);
|
||||
|
||||
assertThat(result.annotations.get(0).endpoint)
|
||||
.isEqualTo(endpoint);
|
||||
assertThat(result.binaryAnnotations.get(0).endpoint)
|
||||
.isEqualTo(result.annotations.get(0).endpoint);
|
||||
}
|
||||
|
||||
/**
|
||||
* In zipkin, the service context is attached to annotations. Sleuth spans
|
||||
* that have no annotations will get an "lc" one, which allows them to be
|
||||
* queryable in zipkin by service name.
|
||||
*/
|
||||
@Test
|
||||
public void spanWithoutAnnotationsLogsComponent() {
|
||||
io.zipkin.Span result = ZipkinMessageListener.convert(span, host);
|
||||
|
||||
assertThat(result.binaryAnnotations)
|
||||
.containsOnly(BinaryAnnotation.create("lc", span.getProcessId(), endpoint));
|
||||
}
|
||||
|
||||
// TODO: "unknown" bc process id, documented as not nullable, is null in some tests.
|
||||
@Test
|
||||
public void nullProcessIdCoercesToUnknownServiceName() {
|
||||
MilliSpan noProcessId = MilliSpan.builder().traceId("xxxx").name("parent").remote(true).build();
|
||||
|
||||
io.zipkin.Span result = ZipkinMessageListener.convert(noProcessId, host);
|
||||
|
||||
assertThat(result.binaryAnnotations)
|
||||
.containsOnly(BinaryAnnotation.create("lc", "unknown", endpoint));
|
||||
}
|
||||
}
|
||||
@@ -16,7 +16,6 @@
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.boot.context.embedded.EmbeddedServletContainerInitializedEvent;
|
||||
import org.springframework.cloud.util.InetUtils;
|
||||
@@ -30,15 +29,14 @@ import com.twitter.zipkin.gen.Endpoint;
|
||||
*/
|
||||
public class ServerPropertiesEndpointLocator implements EndpointLocator {
|
||||
|
||||
@Value("${spring.application.name:application}")
|
||||
private String appName;
|
||||
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
private final ServerProperties serverProperties;
|
||||
private final String appName;
|
||||
private Integer port;
|
||||
|
||||
public ServerPropertiesEndpointLocator(ServerProperties serverProperties) {
|
||||
public ServerPropertiesEndpointLocator(ServerProperties serverProperties,
|
||||
String appName) {
|
||||
this.serverProperties = serverProperties;
|
||||
this.appName = appName;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
package org.springframework.cloud.sleuth.zipkin;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
@@ -65,9 +66,12 @@ public class ZipkinAutoConfiguration {
|
||||
@Autowired(required=false)
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
@Value("${spring.application.name:unknown}")
|
||||
private String appName;
|
||||
|
||||
@Bean
|
||||
public EndpointLocator zipkinEndpointLocator() {
|
||||
return new ServerPropertiesEndpointLocator(this.serverProperties);
|
||||
return new ServerPropertiesEndpointLocator(this.serverProperties, this.appName);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -79,13 +83,16 @@ public class ZipkinAutoConfiguration {
|
||||
@Autowired(required=false)
|
||||
private ServerProperties serverProperties;
|
||||
|
||||
@Value("${spring.application.name:unknown}")
|
||||
private String appName;
|
||||
|
||||
@Autowired(required=false)
|
||||
private DiscoveryClient client;
|
||||
|
||||
@Bean
|
||||
public EndpointLocator zipkinEndpointLocator() {
|
||||
return new FallbackHavingEndpointLocator(discoveryClientEndpointLocator(),
|
||||
new ServerPropertiesEndpointLocator(this.serverProperties));
|
||||
new ServerPropertiesEndpointLocator(this.serverProperties, this.appName));
|
||||
}
|
||||
|
||||
private DiscoveryClientEndpointLocator discoveryClientEndpointLocator() {
|
||||
|
||||
@@ -16,9 +16,7 @@
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
@@ -47,13 +45,16 @@ import lombok.extern.apachecommons.CommonsLog;
|
||||
*/
|
||||
@CommonsLog
|
||||
public class ZipkinSpanListener {
|
||||
private static final Charset UTF_8 = Charset.forName("UTF-8");
|
||||
private static final byte[] UNKNOWN_BYTES = "unknown".getBytes(UTF_8);
|
||||
|
||||
private SpanCollector spanCollector;
|
||||
/**
|
||||
* Endpoint is the visible IP address of this service, the port it is listening on and
|
||||
* the service name from discovery.
|
||||
*/
|
||||
private Endpoint localEndpoint;
|
||||
// Visible for testing
|
||||
Endpoint localEndpoint;
|
||||
|
||||
public ZipkinSpanListener(SpanCollector spanCollector, Endpoint localEndpoint) {
|
||||
this.spanCollector = spanCollector;
|
||||
@@ -63,9 +64,8 @@ public class ZipkinSpanListener {
|
||||
@EventListener
|
||||
@Order(0)
|
||||
public void start(SpanAcquiredEvent event) {
|
||||
// Starting a span in zipkin means adding: traceId, id, parentId(optional), and
|
||||
// timestamp
|
||||
event.getSpan().addTimelineAnnotation("acquire");
|
||||
// Zipkin Span.timestamp corresponds with Sleuth's Span.begin
|
||||
assert event.getSpan().getBegin() != 0;
|
||||
}
|
||||
|
||||
@EventListener
|
||||
@@ -107,7 +107,8 @@ public class ZipkinSpanListener {
|
||||
@Order(0)
|
||||
public void release(SpanReleasedEvent event) {
|
||||
// Ending a span in zipkin means adding duration and sending it out
|
||||
event.getSpan().addTimelineAnnotation("release");
|
||||
// Zipkin Span.duration corresponds with Sleuth's Span.begin and end
|
||||
assert event.getSpan().getEnd() != 0;
|
||||
if (event.getSpan().isExportable()) {
|
||||
this.spanCollector.collect(convert(event.getSpan()));
|
||||
}
|
||||
@@ -121,12 +122,29 @@ public class ZipkinSpanListener {
|
||||
* <li>Create binary annotations based on data from Span object.
|
||||
* </ul>
|
||||
*/
|
||||
public com.twitter.zipkin.gen.Span convert(Span span) {
|
||||
// Visible for testing
|
||||
com.twitter.zipkin.gen.Span convert(Span span) {
|
||||
com.twitter.zipkin.gen.Span zipkinSpan = new com.twitter.zipkin.gen.Span();
|
||||
addZipkinAnnotations(zipkinSpan, span, this.localEndpoint);
|
||||
List<BinaryAnnotation> binaryAnnotationList = createZipkinBinaryAnnotations(span,
|
||||
this.localEndpoint);
|
||||
zipkinSpan.setDuration(span.getEnd() - span.getBegin());
|
||||
|
||||
// A zipkin span without any annotations cannot be queried, add special "lc" to avoid that.
|
||||
if (span.getTimelineAnnotations().isEmpty() && span.getAnnotations().isEmpty()) {
|
||||
// TODO: javadocs say this isn't nullable!
|
||||
byte[] processId = span.getProcessId() != null
|
||||
? span.getProcessId().toLowerCase().getBytes(UTF_8)
|
||||
: UNKNOWN_BYTES;
|
||||
BinaryAnnotation component = new BinaryAnnotation()
|
||||
.setAnnotation_type(AnnotationType.STRING)
|
||||
.setKey("lc") // LOCAL_COMPONENT
|
||||
.setValue(processId)
|
||||
.setHost(this.localEndpoint);
|
||||
zipkinSpan.addToBinary_annotations(component);
|
||||
} else {
|
||||
addZipkinAnnotations(zipkinSpan, span, this.localEndpoint);
|
||||
addZipkinBinaryAnnotations(zipkinSpan, span, this.localEndpoint);
|
||||
}
|
||||
|
||||
zipkinSpan.setTimestamp(span.getBegin() * 1000L);
|
||||
zipkinSpan.setDuration((span.getEnd() - span.getBegin()) * 1000L);
|
||||
zipkinSpan.setTrace_id(hash(span.getTraceId()));
|
||||
if (span.getParents().size() > 0) {
|
||||
if (span.getParents().size() > 1) {
|
||||
@@ -139,35 +157,20 @@ public class ZipkinSpanListener {
|
||||
if (StringUtils.hasText(span.getName())) {
|
||||
zipkinSpan.setName(span.getName());
|
||||
}
|
||||
zipkinSpan.setBinary_annotations(binaryAnnotationList);
|
||||
return zipkinSpan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add annotations from the sleuth Span.
|
||||
*/
|
||||
private void addZipkinAnnotations(com.twitter.zipkin.gen.Span zipkinSpan, Span span,
|
||||
Endpoint endpoint) {
|
||||
Long startTs = null;
|
||||
Long endTs = null;
|
||||
private void addZipkinAnnotations(com.twitter.zipkin.gen.Span zipkinSpan,
|
||||
Span span, Endpoint endpoint) {
|
||||
for (TimelineAnnotation ta : span.getTimelineAnnotations()) {
|
||||
Annotation zipkinAnnotation = createZipkinAnnotation(ta.getMsg(),
|
||||
ta.getTime(), endpoint);
|
||||
if (zipkinAnnotation.getValue().equals("acquire")) {
|
||||
startTs = zipkinAnnotation.getTimestamp();
|
||||
}
|
||||
else if (zipkinAnnotation.getValue().equals("release")) {
|
||||
endTs = zipkinAnnotation.getTimestamp();
|
||||
}
|
||||
else {
|
||||
zipkinSpan.addToAnnotations(zipkinAnnotation);
|
||||
}
|
||||
}
|
||||
if (startTs != null) {
|
||||
zipkinSpan.setTimestamp(startTs);
|
||||
if (endTs != null) {
|
||||
zipkinSpan.setDuration(endTs - zipkinSpan.getTimestamp());
|
||||
}
|
||||
Annotation zipkinAnnotation = new Annotation()
|
||||
.setHost(endpoint)
|
||||
.setTimestamp(ta.getTime() * 1000) // Zipkin is in microseconds
|
||||
.setValue(ta.getMsg());
|
||||
zipkinSpan.addToAnnotations(zipkinAnnotation);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -176,41 +179,16 @@ public class ZipkinSpanListener {
|
||||
*
|
||||
* @return list of Annotations that could be added to Zipkin Span.
|
||||
*/
|
||||
private List<BinaryAnnotation> createZipkinBinaryAnnotations(Span span,
|
||||
Endpoint endpoint) {
|
||||
List<BinaryAnnotation> l = new ArrayList<>();
|
||||
private void addZipkinBinaryAnnotations(com.twitter.zipkin.gen.Span zipkinSpan,
|
||||
Span span, Endpoint endpoint) {
|
||||
for (Map.Entry<String, String> e : span.getAnnotations().entrySet()) {
|
||||
BinaryAnnotation binaryAnn = new BinaryAnnotation();
|
||||
binaryAnn.setAnnotation_type(AnnotationType.STRING);
|
||||
binaryAnn.setKey(e.getKey());
|
||||
try {
|
||||
binaryAnn.setValue(e.getValue().getBytes("UTF-8"));
|
||||
}
|
||||
catch (UnsupportedEncodingException ex) {
|
||||
log.error("Error encoding string as UTF-8", ex);
|
||||
}
|
||||
binaryAnn.setHost(endpoint);
|
||||
l.add(binaryAnn);
|
||||
BinaryAnnotation binaryAnn = new BinaryAnnotation()
|
||||
.setAnnotation_type(AnnotationType.STRING)
|
||||
.setKey(e.getKey())
|
||||
.setValue(e.getValue().getBytes(UTF_8))
|
||||
.setHost(endpoint);
|
||||
zipkinSpan.addToBinary_annotations(binaryAnn);
|
||||
}
|
||||
return l;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an annotation with the correct times and endpoint.
|
||||
*
|
||||
* @param value Annotation value
|
||||
* @param time timestamp will be extracted
|
||||
* @param endpoint the endpoint this annotation will be associated with.
|
||||
*/
|
||||
private static Annotation createZipkinAnnotation(String value, long time,
|
||||
Endpoint endpoint) {
|
||||
Annotation annotation = new Annotation();
|
||||
annotation.setHost(endpoint);
|
||||
|
||||
// Zipkin is in microseconds
|
||||
annotation.setTimestamp(time * 1000);
|
||||
annotation.setValue(value);
|
||||
return annotation;
|
||||
}
|
||||
|
||||
private static long hash(String string) {
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2015 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.UnknownHostException;
|
||||
import org.junit.Test;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class ServerPropertiesEndpointLocatorTests {
|
||||
|
||||
@Test
|
||||
public void portDefaultsTo8080() {
|
||||
ServerPropertiesEndpointLocator locator = new ServerPropertiesEndpointLocator(new ServerProperties(), "unknown");
|
||||
|
||||
assertThat(locator.local().port).isEqualTo((short) 8080);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void portFromServerProperties() {
|
||||
ServerProperties properties = new ServerProperties();
|
||||
properties.setPort(1234);
|
||||
|
||||
ServerPropertiesEndpointLocator locator = new ServerPropertiesEndpointLocator(properties, "unknown");
|
||||
|
||||
assertThat(locator.local().port).isEqualTo((short) 1234);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void portDefaultsToLocalhost() {
|
||||
ServerPropertiesEndpointLocator locator = new ServerPropertiesEndpointLocator(new ServerProperties(), "unknown");
|
||||
|
||||
assertThat(locator.local().ipv4)
|
||||
.isEqualTo(127 << 24 | 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void hostFromServerPropertiesIp() throws UnknownHostException {
|
||||
ServerProperties properties = new ServerProperties();
|
||||
properties.setAddress(InetAddress.getByAddress(new byte[]{1, 2, 3, 4}));
|
||||
|
||||
ServerPropertiesEndpointLocator locator = new ServerPropertiesEndpointLocator(properties, "unknown");
|
||||
|
||||
assertThat(locator.local().ipv4)
|
||||
.isEqualTo(1 << 24 | 2 << 16 | 3 << 8 | 4);
|
||||
}
|
||||
}
|
||||
@@ -16,9 +16,12 @@
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import com.twitter.zipkin.gen.Endpoint;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
@@ -66,21 +69,71 @@ public class ZipkinSpanListenerTests {
|
||||
@Autowired
|
||||
private ZipkinTestConfiguration test;
|
||||
|
||||
@Autowired
|
||||
private ZipkinSpanListener listener;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.test.spans.clear();
|
||||
}
|
||||
|
||||
Span parent = MilliSpan.builder().traceId("xxxx").name("parent").remote(true).build();
|
||||
|
||||
/** Sleuth timestamps are millisecond granularity while zipkin is microsecond. */
|
||||
@Test
|
||||
public void acquireAndRelease() {
|
||||
public void convertsTimestampAndDurationToMicroseconds() {
|
||||
long start = System.currentTimeMillis();
|
||||
parent.addTimelineAnnotation("http/request/retry"); // System.currentTimeMillis
|
||||
|
||||
com.twitter.zipkin.gen.Span result = listener.convert(parent);
|
||||
|
||||
assertThat(result.timestamp)
|
||||
.isEqualTo(parent.getBegin() * 1000);
|
||||
assertThat(result.duration)
|
||||
.isEqualTo((parent.getEnd() - parent.getBegin()) * 1000);
|
||||
assertThat(result.annotations.get(0).timestamp)
|
||||
.isGreaterThanOrEqualTo(start * 1000)
|
||||
.isLessThanOrEqualTo(System.currentTimeMillis() * 1000);
|
||||
}
|
||||
|
||||
/** Sleuth host corresponds to annotation/binaryAnnotation.host in zipkin. */
|
||||
@Test
|
||||
public void annotationsIncludeHost() {
|
||||
parent.addTimelineAnnotation("http/request/retry");
|
||||
parent.addAnnotation("spring-boot/version", "1.3.1.RELEASE");
|
||||
|
||||
com.twitter.zipkin.gen.Span result = listener.convert(parent);
|
||||
|
||||
assertThat(result.annotations.get(0).host)
|
||||
.isEqualTo(listener.localEndpoint);
|
||||
assertThat(result.binary_annotations.get(0).host)
|
||||
.isEqualTo(result.annotations.get(0).host);
|
||||
}
|
||||
|
||||
/** zipkin's Endpoint.serviceName should never be null. */
|
||||
@Test
|
||||
public void localEndpointIncludesServiceName() {
|
||||
assertThat(listener.localEndpoint.service_name)
|
||||
.isNotEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* In zipkin, the service context is attached to annotations. Sleuth spans
|
||||
* that have no annotations will get an "lc" one, which allows them to be
|
||||
* queryable in zipkin by service name.
|
||||
*/
|
||||
@Test
|
||||
public void spanWithoutAnnotationsLogsComponent() {
|
||||
Trace context = this.traceManager.startSpan("foo");
|
||||
this.traceManager.close(context);
|
||||
assertEquals(1, this.test.spans.size());
|
||||
assertThat(this.test.spans.get(0).getBinary_annotations())
|
||||
.extracting("host.service_name")
|
||||
.isEqualTo("unknown"); // TODO: "unknown" bc process id, documented as not nullable, is null.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rpcAnnotations() {
|
||||
Span parent = MilliSpan.builder().traceId("xxxx").name("parent").remote(true).build();
|
||||
Trace context = this.traceManager.startSpan("child", parent);
|
||||
this.application.publishEvent(new ClientSentEvent(this, context.getSpan()));
|
||||
this.application.publishEvent(new ServerReceivedEvent(this, parent, context.getSpan()));
|
||||
|
||||
Reference in New Issue
Block a user