Adds auto-configuration for RabbitMQ and Kafka transports
Add one of the following to auto-configure a messaging transport: * spring-kakfa 2.0+ * spring-rabbit < 2.0 (unless amqp-client 5.x is compatible w/ 4.x)
This commit is contained in:
@@ -14,9 +14,9 @@
|
||||
<name>spring-cloud-sleuth-dependencies</name>
|
||||
<description>Spring Cloud Sleuth Dependencies</description>
|
||||
<properties>
|
||||
<zipkin.version>2.2.0</zipkin.version>
|
||||
<zipkin-reporter.version>1.1.1</zipkin-reporter.version>
|
||||
<zipkin-reporter2.version>2.1.1</zipkin-reporter2.version>
|
||||
<zipkin.version>2.2.1</zipkin.version>
|
||||
<zipkin-reporter.version>1.1.2</zipkin-reporter.version>
|
||||
<zipkin-reporter2.version>2.1.3</zipkin-reporter2.version>
|
||||
</properties>
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
@@ -105,6 +105,30 @@
|
||||
<artifactId>zipkin-reporter</artifactId>
|
||||
<version>${zipkin-reporter2.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.reporter2</groupId>
|
||||
<artifactId>zipkin-sender-kafka11</artifactId>
|
||||
<version>${zipkin-reporter2.version}</version>
|
||||
<exclusions>
|
||||
<!-- assigned with spring-kafka -->
|
||||
<exclusion>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.reporter2</groupId>
|
||||
<artifactId>zipkin-sender-amqp-client</artifactId>
|
||||
<version>${zipkin-reporter2.version}</version>
|
||||
<exclusions>
|
||||
<!-- assigned with spring-rabbit -->
|
||||
<exclusion>
|
||||
<groupId>com.rabbitmq</groupId>
|
||||
<artifactId>amqp-client</artifactId>
|
||||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
<profiles>
|
||||
|
||||
@@ -59,17 +59,17 @@
|
||||
<dependency>
|
||||
<groupId>io.zipkin.java</groupId>
|
||||
<artifactId>zipkin</artifactId>
|
||||
<version>2.2.0</version>
|
||||
<version>2.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.zipkin2</groupId>
|
||||
<artifactId>zipkin</artifactId>
|
||||
<version>2.2.0</version>
|
||||
<version>2.2.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.java</groupId>
|
||||
<artifactId>zipkin-server</artifactId>
|
||||
<version>2.2.0</version>
|
||||
<version>2.2.1</version>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</dependencyManagement>
|
||||
|
||||
@@ -68,6 +68,24 @@
|
||||
<groupId>io.zipkin.reporter2</groupId>
|
||||
<artifactId>zipkin-reporter</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.reporter2</groupId>
|
||||
<artifactId>zipkin-sender-kafka11</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.kafka</groupId>
|
||||
<artifactId>spring-kafka</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>io.zipkin.reporter2</groupId>
|
||||
<artifactId>zipkin-sender-amqp-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.amqp</groupId>
|
||||
<artifactId>spring-rabbit</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework</groupId>
|
||||
<artifactId>spring-messaging</artifactId>
|
||||
|
||||
@@ -16,25 +16,17 @@
|
||||
|
||||
package org.springframework.cloud.sleuth.zipkin2;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
||||
import org.springframework.cloud.client.serviceregistry.Registration;
|
||||
import org.springframework.cloud.commons.util.InetUtils;
|
||||
import org.springframework.cloud.sleuth.Sampler;
|
||||
@@ -44,13 +36,11 @@ import org.springframework.cloud.sleuth.autoconfig.TraceAutoConfiguration;
|
||||
import org.springframework.cloud.sleuth.metric.SpanMetricReporter;
|
||||
import org.springframework.cloud.sleuth.sampler.PercentageBasedSampler;
|
||||
import org.springframework.cloud.sleuth.sampler.SamplerProperties;
|
||||
import org.springframework.cloud.sleuth.zipkin2.sender.ZipkinSenderConfigurationImportSelector;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.web.client.RequestCallback;
|
||||
import org.springframework.web.client.ResponseExtractor;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import zipkin2.Span;
|
||||
import zipkin2.reporter.AsyncReporter;
|
||||
@@ -77,10 +67,10 @@ import zipkin2.reporter.Sender;
|
||||
@EnableConfigurationProperties({ZipkinProperties.class, SamplerProperties.class})
|
||||
@ConditionalOnProperty(value = "spring.zipkin.enabled", matchIfMissing = true)
|
||||
@AutoConfigureBefore(TraceAutoConfiguration.class)
|
||||
@Import(ZipkinSenderConfigurationImportSelector.class)
|
||||
public class ZipkinAutoConfiguration {
|
||||
|
||||
@Autowired(required = false) List<SpanAdjuster> spanAdjusters = new ArrayList<>();
|
||||
@Autowired ZipkinUrlExtractor extractor;
|
||||
|
||||
/**
|
||||
* Accepts a sender so you can plug-in any standard one. Returns a Reporter so you can also
|
||||
@@ -100,55 +90,6 @@ public class ZipkinAutoConfiguration {
|
||||
.build(zipkin.getEncoder());
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public Sender restTemplateSender(ZipkinProperties zipkin,
|
||||
ZipkinRestTemplateCustomizer zipkinRestTemplateCustomizer) {
|
||||
RestTemplate restTemplate = new ZipkinRestTemplateWrapper(zipkin, this.extractor);
|
||||
zipkinRestTemplateCustomizer.customize(restTemplate);
|
||||
return new RestTemplateSender(restTemplate, zipkin.getBaseUrl(), zipkin.getEncoder());
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(DiscoveryClient.class)
|
||||
static class DiscoveryClientZipkinUrlExtractorConfiguration {
|
||||
|
||||
@Autowired(required = false) DiscoveryClient discoveryClient;
|
||||
|
||||
@Bean
|
||||
ZipkinUrlExtractor zipkinUrlExtractor() {
|
||||
final DiscoveryClient discoveryClient = this.discoveryClient;
|
||||
return new ZipkinUrlExtractor() {
|
||||
@Override
|
||||
public URI zipkinUrl(ZipkinProperties zipkinProperties) {
|
||||
if (discoveryClient != null) {
|
||||
URI uri = URI.create(zipkinProperties.getBaseUrl());
|
||||
String host = uri.getHost();
|
||||
List<ServiceInstance> instances = discoveryClient.getInstances(host);
|
||||
if (!instances.isEmpty()) {
|
||||
return instances.get(0).getUri();
|
||||
}
|
||||
}
|
||||
return URI.create(zipkinProperties.getBaseUrl());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnMissingClass("org.springframework.cloud.client.discovery.DiscoveryClient")
|
||||
static class DefaultZipkinUrlExtractorConfiguration {
|
||||
@Bean
|
||||
ZipkinUrlExtractor zipkinUrlExtractor() {
|
||||
return new ZipkinUrlExtractor() {
|
||||
@Override
|
||||
public URI zipkinUrl(ZipkinProperties zipkinProperties) {
|
||||
return URI.create(zipkinProperties.getBaseUrl());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public ZipkinRestTemplateCustomizer zipkinRestTemplateCustomizer(ZipkinProperties zipkinProperties) {
|
||||
@@ -164,7 +105,7 @@ public class ZipkinAutoConfiguration {
|
||||
@Bean
|
||||
public SpanReporter zipkinSpanListener(Reporter<Span> reporter, EndpointLocator endpointLocator,
|
||||
Environment environment) {
|
||||
return new ZipkinSpanListener(reporter, endpointLocator, environment, this.spanAdjusters);
|
||||
return new ZipkinSpanReporter(reporter, endpointLocator, environment, this.spanAdjusters);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@@ -220,52 +161,3 @@ public class ZipkinAutoConfiguration {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal interface to provide a way to retrieve Zipkin URI. If there's no discovery client
|
||||
* then this value will be taken from the properties. Otherwise host will be assumed to
|
||||
* be a service id.
|
||||
*/
|
||||
interface ZipkinUrlExtractor {
|
||||
URI zipkinUrl(ZipkinProperties zipkinProperties);
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves at runtime where the Zipkin server is. If there's no discovery client then
|
||||
* {@link URI} from the properties is taken. Otherwise service discovery is pinged
|
||||
* for current Zipkin address.
|
||||
*/
|
||||
class ZipkinRestTemplateWrapper extends RestTemplate {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ZipkinRestTemplateWrapper.class);
|
||||
|
||||
private final ZipkinProperties zipkinProperties;
|
||||
private final ZipkinUrlExtractor extractor;
|
||||
|
||||
ZipkinRestTemplateWrapper(ZipkinProperties zipkinProperties,
|
||||
ZipkinUrlExtractor extractor) {
|
||||
this.zipkinProperties = zipkinProperties;
|
||||
this.extractor = extractor;
|
||||
}
|
||||
|
||||
@Override protected <T> T doExecute(URI originalUrl, HttpMethod method,
|
||||
RequestCallback requestCallback,
|
||||
ResponseExtractor<T> responseExtractor) throws RestClientException {
|
||||
URI uri = this.extractor.zipkinUrl(this.zipkinProperties);
|
||||
URI newUri = resolvedZipkinUri(originalUrl, uri);
|
||||
return super.doExecute(newUri, method, requestCallback, responseExtractor);
|
||||
}
|
||||
|
||||
private URI resolvedZipkinUri(URI originalUrl, URI resolvedZipkinUri) {
|
||||
try {
|
||||
return new URI(resolvedZipkinUri.getScheme(), resolvedZipkinUri.getUserInfo(),
|
||||
resolvedZipkinUri.getHost(), resolvedZipkinUri.getPort(), originalUrl.getPath(),
|
||||
originalUrl.getQuery(), originalUrl.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Failed to create the new URI from original [" + originalUrl + "] and new one [" + resolvedZipkinUri + "]");
|
||||
}
|
||||
return originalUrl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,9 +32,9 @@ import zipkin2.reporter.Reporter;
|
||||
/**
|
||||
* Listener of Sleuth events. Reports to Zipkin via {@link Reporter}.
|
||||
*/
|
||||
public class ZipkinSpanListener implements SpanReporter {
|
||||
public class ZipkinSpanReporter implements SpanReporter {
|
||||
private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
|
||||
.getLog(ZipkinSpanListener.class);
|
||||
.getLog(ZipkinSpanReporter.class);
|
||||
|
||||
private final Reporter<zipkin2.Span> reporter;
|
||||
private final Environment environment;
|
||||
@@ -46,7 +46,7 @@ public class ZipkinSpanListener implements SpanReporter {
|
||||
// Visible for testing
|
||||
final EndpointLocator endpointLocator;
|
||||
|
||||
public ZipkinSpanListener(Reporter<zipkin2.Span> reporter, EndpointLocator endpointLocator,
|
||||
public ZipkinSpanReporter(Reporter<zipkin2.Span> reporter, EndpointLocator endpointLocator,
|
||||
Environment environment, List<SpanAdjuster> spanAdjusters) {
|
||||
this.reporter = reporter;
|
||||
this.endpointLocator = endpointLocator;
|
||||
@@ -202,4 +202,9 @@ public class ZipkinSpanListener implements SpanReporter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(){
|
||||
return "ZipkinSpanReporter(" + this.reporter + ")";
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2;
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@@ -0,0 +1,51 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import zipkin2.reporter.Sender;
|
||||
import zipkin2.reporter.kafka11.KafkaSender;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(ByteArraySerializer.class)
|
||||
@ConditionalOnBean(KafkaProperties.class)
|
||||
@ConditionalOnMissingBean(Sender.class)
|
||||
@Conditional(ZipkinSenderCondition.class)
|
||||
class ZipkinKafkaSenderConfiguration {
|
||||
@Value("${spring.zipkin.kafka.topic:zipkin}")
|
||||
private String topic;
|
||||
|
||||
@Bean Sender kafkaSender(KafkaProperties config) {
|
||||
Map<String, Object> properties = config.buildProducerProperties();
|
||||
properties.put("key.serializer", ByteArraySerializer.class.getName());
|
||||
properties.put("value.serializer", ByteArraySerializer.class.getName());
|
||||
// Kafka expects the input to be a String, but KafkaProperties returns a list
|
||||
Object bootstrapServers = properties.get("bootstrap.servers");
|
||||
if (bootstrapServers instanceof List) {
|
||||
properties.put("bootstrap.servers", join((List) bootstrapServers));
|
||||
}
|
||||
return KafkaSender.newBuilder()
|
||||
.topic(this.topic)
|
||||
.overrides(properties)
|
||||
.build();
|
||||
}
|
||||
|
||||
static String join(List<?> parts) {
|
||||
StringBuilder to = new StringBuilder();
|
||||
for (int i = 0, length = parts.size(); i < length; i++) {
|
||||
to.append(parts.get(i));
|
||||
if (i + 1 < length) {
|
||||
to.append(',');
|
||||
}
|
||||
}
|
||||
return to.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import zipkin2.reporter.Sender;
|
||||
import zipkin2.reporter.amqp.RabbitMQSender;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnBean(CachingConnectionFactory.class)
|
||||
@ConditionalOnMissingBean(Sender.class)
|
||||
@Conditional(ZipkinSenderCondition.class)
|
||||
class ZipkinRabbitSenderConfiguration {
|
||||
@Value("${spring.zipkin.rabbitmq.queue:zipkin}")
|
||||
private String queue;
|
||||
|
||||
@Bean Sender rabbitSender(CachingConnectionFactory connectionFactory, RabbitProperties config) {
|
||||
return RabbitMQSender.newBuilder()
|
||||
.connectionFactory(connectionFactory.getRabbitConnectionFactory())
|
||||
.queue(this.queue)
|
||||
.addresses(config.determineAddresses())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingClass;
|
||||
import org.springframework.cloud.client.ServiceInstance;
|
||||
import org.springframework.cloud.client.discovery.DiscoveryClient;
|
||||
import org.springframework.cloud.sleuth.zipkin2.ZipkinProperties;
|
||||
import org.springframework.cloud.sleuth.zipkin2.ZipkinRestTemplateCustomizer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Conditional;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.web.client.RequestCallback;
|
||||
import org.springframework.web.client.ResponseExtractor;
|
||||
import org.springframework.web.client.RestClientException;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
import zipkin2.reporter.Sender;
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnMissingBean(Sender.class)
|
||||
@Conditional(ZipkinSenderCondition.class)
|
||||
class ZipkinRestTemplateSenderConfiguration {
|
||||
@Autowired ZipkinUrlExtractor extractor;
|
||||
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public Sender restTemplateSender(ZipkinProperties zipkin,
|
||||
ZipkinRestTemplateCustomizer zipkinRestTemplateCustomizer) {
|
||||
RestTemplate restTemplate = new ZipkinRestTemplateWrapper(zipkin, this.extractor);
|
||||
zipkinRestTemplateCustomizer.customize(restTemplate);
|
||||
return new RestTemplateSender(restTemplate, zipkin.getBaseUrl(), zipkin.getEncoder());
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnMissingClass("org.springframework.cloud.client.discovery.DiscoveryClient")
|
||||
static class DefaultZipkinUrlExtractorConfiguration {
|
||||
@Bean
|
||||
ZipkinUrlExtractor zipkinUrlExtractor() {
|
||||
return new ZipkinUrlExtractor() {
|
||||
@Override
|
||||
public URI zipkinUrl(ZipkinProperties zipkinProperties) {
|
||||
return URI.create(zipkinProperties.getBaseUrl());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@ConditionalOnClass(DiscoveryClient.class)
|
||||
static class DiscoveryClientZipkinUrlExtractorConfiguration {
|
||||
|
||||
@Autowired(required = false) DiscoveryClient discoveryClient;
|
||||
|
||||
@Bean
|
||||
ZipkinUrlExtractor zipkinUrlExtractor() {
|
||||
final DiscoveryClient discoveryClient = this.discoveryClient;
|
||||
return new ZipkinUrlExtractor() {
|
||||
@Override
|
||||
public URI zipkinUrl(ZipkinProperties zipkinProperties) {
|
||||
if (discoveryClient != null) {
|
||||
URI uri = URI.create(zipkinProperties.getBaseUrl());
|
||||
String host = uri.getHost();
|
||||
List<ServiceInstance> instances = discoveryClient.getInstances(host);
|
||||
if (!instances.isEmpty()) {
|
||||
return instances.get(0).getUri();
|
||||
}
|
||||
}
|
||||
return URI.create(zipkinProperties.getBaseUrl());
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolves at runtime where the Zipkin server is. If there's no discovery client then {@link URI}
|
||||
* from the properties is taken. Otherwise service discovery is pinged for current Zipkin address.
|
||||
*/
|
||||
class ZipkinRestTemplateWrapper extends RestTemplate {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ZipkinRestTemplateWrapper.class);
|
||||
|
||||
private final ZipkinProperties zipkinProperties;
|
||||
private final ZipkinUrlExtractor extractor;
|
||||
|
||||
ZipkinRestTemplateWrapper(ZipkinProperties zipkinProperties,
|
||||
ZipkinUrlExtractor extractor) {
|
||||
this.zipkinProperties = zipkinProperties;
|
||||
this.extractor = extractor;
|
||||
}
|
||||
|
||||
@Override protected <T> T doExecute(URI originalUrl, HttpMethod method,
|
||||
RequestCallback requestCallback,
|
||||
ResponseExtractor<T> responseExtractor) throws RestClientException {
|
||||
URI uri = this.extractor.zipkinUrl(this.zipkinProperties);
|
||||
URI newUri = resolvedZipkinUri(originalUrl, uri);
|
||||
return super.doExecute(newUri, method, requestCallback, responseExtractor);
|
||||
}
|
||||
|
||||
private URI resolvedZipkinUri(URI originalUrl, URI resolvedZipkinUri) {
|
||||
try {
|
||||
return new URI(resolvedZipkinUri.getScheme(), resolvedZipkinUri.getUserInfo(),
|
||||
resolvedZipkinUri.getHost(), resolvedZipkinUri.getPort(), originalUrl.getPath(),
|
||||
originalUrl.getQuery(), originalUrl.getFragment());
|
||||
} catch (URISyntaxException e) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Failed to create the new URI from original ["
|
||||
+ originalUrl
|
||||
+ "] and new one ["
|
||||
+ resolvedZipkinUri
|
||||
+ "]");
|
||||
}
|
||||
return originalUrl;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal interface to provide a way to retrieve Zipkin URI. If there's no discovery client then
|
||||
* this value will be taken from the properties. Otherwise host will be assumed to be a service id.
|
||||
*/
|
||||
interface ZipkinUrlExtractor {
|
||||
URI zipkinUrl(ZipkinProperties zipkinProperties);
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionMessage;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionOutcome;
|
||||
import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
|
||||
import org.springframework.boot.bind.RelaxedPropertyResolver;
|
||||
import org.springframework.context.annotation.ConditionContext;
|
||||
import org.springframework.core.type.AnnotatedTypeMetadata;
|
||||
import org.springframework.core.type.AnnotationMetadata;
|
||||
import org.springframework.core.type.ClassMetadata;
|
||||
|
||||
import static org.springframework.cloud.sleuth.zipkin2.sender.ZipkinSenderConfigurationImportSelector.getType;
|
||||
|
||||
/** Attach this to any new sender configuration. */
|
||||
class ZipkinSenderCondition extends SpringBootCondition {
|
||||
|
||||
@Override
|
||||
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata md) {
|
||||
String sourceClass = "";
|
||||
if (md instanceof ClassMetadata) {
|
||||
sourceClass = ((ClassMetadata) md).getClassName();
|
||||
}
|
||||
ConditionMessage.Builder message = ConditionMessage.forCondition("ZipkinSender", sourceClass);
|
||||
RelaxedPropertyResolver resolver = new RelaxedPropertyResolver(
|
||||
context.getEnvironment(), "spring.zipkin.sender.");
|
||||
if (!resolver.containsProperty("type")) {
|
||||
return ConditionOutcome.match(message.because("automatic sender type"));
|
||||
}
|
||||
|
||||
String senderType = getType(((AnnotationMetadata) md).getClassName());
|
||||
String value = resolver.getProperty("type");
|
||||
if (value.equals(senderType)) {
|
||||
return ConditionOutcome.match(message.because(value + " sender type"));
|
||||
}
|
||||
return ConditionOutcome.noMatch(message.because(value + " sender type"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package org.springframework.cloud.sleuth.zipkin2.sender;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import org.springframework.context.annotation.ImportSelector;
|
||||
import org.springframework.core.type.AnnotationMetadata;
|
||||
|
||||
public class ZipkinSenderConfigurationImportSelector implements ImportSelector {
|
||||
|
||||
static final Map<String, String> MAPPINGS;
|
||||
|
||||
// Classes below must be annotated with @Conditional(ZipkinSenderCondition.class)
|
||||
static {
|
||||
// Mappings in descending priority (highest is last)
|
||||
Map<String, String> mappings = new LinkedHashMap<>();
|
||||
mappings.put("rabbit", ZipkinRabbitSenderConfiguration.class.getName());
|
||||
mappings.put("kafka", ZipkinKafkaSenderConfiguration.class.getName());
|
||||
mappings.put("web", ZipkinRestTemplateSenderConfiguration.class.getName());
|
||||
MAPPINGS = Collections.unmodifiableMap(mappings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
|
||||
return MAPPINGS.values().toArray(new String[0]);
|
||||
}
|
||||
|
||||
static String getType(String configurationClassName) {
|
||||
for (Map.Entry<String, String> entry : MAPPINGS.entrySet()) {
|
||||
if (entry.getValue().equals(configurationClassName)) {
|
||||
return entry.getKey();
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("Unknown configuration class " + configurationClassName);
|
||||
}
|
||||
}
|
||||
@@ -23,13 +23,18 @@ import org.junit.After;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
import org.springframework.cloud.sleuth.Span;
|
||||
import org.springframework.cloud.sleuth.SpanReporter;
|
||||
import org.springframework.cloud.sleuth.metric.TraceMetricsAutoConfiguration;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import zipkin2.reporter.amqp.RabbitMQSender;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.BDDAssertions.then;
|
||||
import static org.springframework.boot.test.util.EnvironmentTestUtils.addEnvironment;
|
||||
|
||||
@@ -92,4 +97,78 @@ public class ZipkinAutoConfigurationTests {
|
||||
then(request.getPath()).isEqualTo("/api/v1/spans");
|
||||
then(request.getBody().readUtf8()).contains("binaryAnnotations");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void overrideRabbitMQQueue() throws Exception {
|
||||
context = new AnnotationConfigApplicationContext();
|
||||
addEnvironment(context, "spring.zipkin.rabbitmq.queue:zipkin2");
|
||||
context.register(
|
||||
PropertyPlaceholderAutoConfiguration.class,
|
||||
TraceMetricsAutoConfiguration.class,
|
||||
RabbitAutoConfiguration.class,
|
||||
ZipkinAutoConfiguration.class);
|
||||
context.refresh();
|
||||
|
||||
SpanReporter spanReporter = context.getBean(SpanReporter.class);
|
||||
assertThat(spanReporter).extracting("reporter.sender.queue")
|
||||
.contains("zipkin2");
|
||||
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void overrideKafkaTopic() throws Exception {
|
||||
context = new AnnotationConfigApplicationContext();
|
||||
addEnvironment(context, "spring.zipkin.kafka.topic:zipkin2");
|
||||
context.register(
|
||||
PropertyPlaceholderAutoConfiguration.class,
|
||||
TraceMetricsAutoConfiguration.class,
|
||||
KafkaAutoConfiguration.class,
|
||||
ZipkinAutoConfiguration.class);
|
||||
context.refresh();
|
||||
|
||||
SpanReporter spanReporter = context.getBean(SpanReporter.class);
|
||||
assertThat(spanReporter).extracting("reporter.sender.topic")
|
||||
.contains("zipkin2");
|
||||
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void canOverrideBySender() throws Exception {
|
||||
context = new AnnotationConfigApplicationContext();
|
||||
addEnvironment(context, "spring.zipkin.sender.type:web");
|
||||
context.register(
|
||||
PropertyPlaceholderAutoConfiguration.class,
|
||||
TraceMetricsAutoConfiguration.class,
|
||||
RabbitAutoConfiguration.class,
|
||||
KafkaAutoConfiguration.class,
|
||||
ZipkinAutoConfiguration.class);
|
||||
context.refresh();
|
||||
|
||||
SpanReporter spanReporter = context.getBean(SpanReporter.class);
|
||||
assertThat(spanReporter).extracting("reporter.sender").allSatisfy(
|
||||
s -> assertThat(s.getClass().getSimpleName()).isEqualTo("RestTemplateSender")
|
||||
);
|
||||
|
||||
context.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void rabbitWinsWhenKafkaPresent() throws Exception {
|
||||
context = new AnnotationConfigApplicationContext();
|
||||
context.register(
|
||||
PropertyPlaceholderAutoConfiguration.class,
|
||||
TraceMetricsAutoConfiguration.class,
|
||||
RabbitAutoConfiguration.class,
|
||||
KafkaAutoConfiguration.class,
|
||||
ZipkinAutoConfiguration.class);
|
||||
context.refresh();
|
||||
|
||||
SpanReporter spanReporter = context.getBean(SpanReporter.class);
|
||||
assertThat(spanReporter).extracting("reporter.sender")
|
||||
.allSatisfy(s -> assertThat(s).isInstanceOf(RabbitMQSender.class));
|
||||
|
||||
context.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,8 +26,10 @@ import org.springframework.test.context.junit4.SpringRunner;
|
||||
import zipkin.junit.ZipkinRule;
|
||||
|
||||
@RunWith(SpringRunner.class)
|
||||
@SpringBootTest(classes = ZipkinDiscoveryClientTests.Config.class,
|
||||
properties = "spring.zipkin.baseUrl=http://zipkin/")
|
||||
@SpringBootTest(classes = ZipkinDiscoveryClientTests.Config.class, properties = {
|
||||
"spring.zipkin.baseUrl=http://zipkin/",
|
||||
"spring.zipkin.sender.type=web" // override default priority which picks rabbit due to classpath
|
||||
})
|
||||
public class ZipkinDiscoveryClientTests {
|
||||
|
||||
@ClassRule public static ZipkinRule ZIPKIN_RULE = new ZipkinRule();
|
||||
|
||||
@@ -32,7 +32,7 @@ import org.springframework.cloud.sleuth.SpanAdjuster;
|
||||
import org.springframework.cloud.sleuth.SpanReporter;
|
||||
import org.springframework.cloud.sleuth.Tracer;
|
||||
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
|
||||
import org.springframework.cloud.sleuth.zipkin2.ZipkinSpanListenerTests.TestConfiguration;
|
||||
import org.springframework.cloud.sleuth.zipkin2.ZipkinSpanReporterTests.TestConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Primary;
|
||||
@@ -52,12 +52,12 @@ import static org.junit.Assert.assertEquals;
|
||||
*/
|
||||
@SpringBootTest(classes = TestConfiguration.class)
|
||||
@RunWith(SpringRunner.class)
|
||||
public class ZipkinSpanListenerTests {
|
||||
public class ZipkinSpanReporterTests {
|
||||
|
||||
@Autowired Tracer tracer;
|
||||
@Autowired TestConfiguration test;
|
||||
@Autowired ZipkinSpanListener spanListener;
|
||||
@Autowired Reporter<zipkin2.Span> spanReporter;
|
||||
@Autowired ZipkinSpanReporter spanReporter;
|
||||
@Autowired Reporter<zipkin2.Span> zipkinReporter;
|
||||
@Autowired MockEnvironment mockEnvironment;
|
||||
@Autowired EndpointLocator endpointLocator;
|
||||
|
||||
@@ -76,7 +76,7 @@ public class ZipkinSpanListenerTests {
|
||||
span.logEvent("hystrix/retry"); // System.currentTimeMillis
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
assertThat(result.timestamp())
|
||||
.isEqualTo(span.getBegin() * 1000);
|
||||
@@ -97,7 +97,7 @@ public class ZipkinSpanListenerTests {
|
||||
Thread.sleep(20);
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
assertThat(result.timestamp()).isEqualTo(span.getBegin() * 1000);
|
||||
long clientSendTimestamp = span.logs().stream()
|
||||
@@ -114,7 +114,7 @@ public class ZipkinSpanListenerTests {
|
||||
@Test
|
||||
public void doesntSetDurationWhenStillRunning() {
|
||||
Span span = Span.builder().traceId(1L).name("http:api").build();
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
assertThat(result.timestamp())
|
||||
.isGreaterThan(0); // sanity check it did start
|
||||
@@ -128,16 +128,16 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.logEvent("hystrix/retry");
|
||||
this.parent.tag("spring-boot/version", "1.3.1.RELEASE");
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.localEndpoint())
|
||||
.isEqualTo(this.spanListener.endpointLocator.local());
|
||||
.isEqualTo(this.spanReporter.endpointLocator.local());
|
||||
}
|
||||
|
||||
/** zipkin's Endpoint.serviceName should never be null. */
|
||||
@Test
|
||||
public void localEndpointIncludesServiceName() {
|
||||
assertThat(this.spanListener.endpointLocator.local().serviceName())
|
||||
assertThat(this.spanReporter.endpointLocator.local().serviceName())
|
||||
.isNotEmpty();
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ public class ZipkinSpanListenerTests {
|
||||
Span context = this.tracer.createSpan("http:child", this.parent);
|
||||
context.logEvent(Span.CLIENT_SEND);
|
||||
logServerReceived(this.parent);
|
||||
logServerSent(this.spanListener, this.parent);
|
||||
logServerSent(this.spanReporter, this.parent);
|
||||
this.tracer.close(context);
|
||||
assertEquals(2, this.test.zipkinSpans.size());
|
||||
}
|
||||
@@ -178,7 +178,7 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.logEvent("hystrix/retry");
|
||||
this.parent.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.tags())
|
||||
.isEmpty();
|
||||
@@ -190,7 +190,7 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.tag(Span.SPAN_PEER_SERVICE_TAG_NAME, "fooservice");
|
||||
this.parent.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.remoteEndpoint())
|
||||
.isEqualTo(Endpoint.newBuilder().serviceName("fooservice").build());
|
||||
@@ -201,7 +201,7 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.logEvent("cs");
|
||||
this.parent.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.remoteEndpoint())
|
||||
.isNull();
|
||||
@@ -211,7 +211,7 @@ public class ZipkinSpanListenerTests {
|
||||
public void converts128BitTraceId() {
|
||||
Span span = Span.builder().traceIdHigh(1L).traceId(2L).spanId(3L).name("foo").build();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
assertThat(result.traceId())
|
||||
.isEqualTo("00000000000000010000000000000002");
|
||||
@@ -223,7 +223,7 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.tag(Span.SPAN_PEER_SERVICE_TAG_NAME, "fooservice");
|
||||
this.parent.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.remoteEndpoint())
|
||||
.isEqualTo(Endpoint.newBuilder().serviceName("fooservice").build());
|
||||
@@ -233,7 +233,7 @@ public class ZipkinSpanListenerTests {
|
||||
public void shouldNotReportToZipkinWhenSpanIsNotExportable() {
|
||||
Span span = Span.builder().exportable(false).build();
|
||||
|
||||
this.spanListener.report(span);
|
||||
this.spanReporter.report(span);
|
||||
|
||||
assertThat(this.test.zipkinSpans).isEmpty();
|
||||
}
|
||||
@@ -243,7 +243,7 @@ public class ZipkinSpanListenerTests {
|
||||
this.parent.logEvent(Span.CLIENT_SEND);
|
||||
this.mockEnvironment.setProperty("vcap.application.instance_id", "foo");
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(this.parent);
|
||||
zipkin2.Span result = this.spanReporter.convert(this.parent);
|
||||
|
||||
assertThat(result.tags())
|
||||
.containsExactly(entry(Span.INSTANCEID, "foo"));
|
||||
@@ -252,7 +252,7 @@ public class ZipkinSpanListenerTests {
|
||||
@Test
|
||||
public void shouldNotAddAnyServiceIdTagWhenSpanContainsRpcEventAndThereIsNoEnvironment() {
|
||||
this.parent.logEvent(Span.CLIENT_RECV);
|
||||
ZipkinSpanListener spanListener = new ZipkinSpanListener(this.spanReporter,
|
||||
ZipkinSpanReporter spanListener = new ZipkinSpanReporter(this.zipkinReporter,
|
||||
this.endpointLocator, null, new ArrayList<>());
|
||||
|
||||
zipkin2.Span result = spanListener.convert(this.parent);
|
||||
@@ -264,7 +264,7 @@ public class ZipkinSpanListenerTests {
|
||||
@Test
|
||||
public void should_adjust_span_before_reporting_it() {
|
||||
this.parent.logEvent(Span.CLIENT_RECV);
|
||||
ZipkinSpanListener spanListener = new ZipkinSpanListener(this.spanReporter,
|
||||
ZipkinSpanReporter spanListener = new ZipkinSpanReporter(this.zipkinReporter,
|
||||
this.endpointLocator, null, Collections.<SpanAdjuster>singletonList(
|
||||
span -> Span.builder().from(span).name("foo").build())) {
|
||||
@Override String defaultInstanceId() {
|
||||
@@ -288,7 +288,7 @@ public class ZipkinSpanListenerTests {
|
||||
.build();
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
assertThat(result.duration()).isNotNull();
|
||||
assertThat(result.timestamp()).isNotNull();
|
||||
@@ -302,7 +302,7 @@ public class ZipkinSpanListenerTests {
|
||||
span.logEvent("ss");
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
then(result.kind()).isEqualTo(zipkin2.Span.Kind.SERVER);
|
||||
then(result.annotations()).isEmpty();
|
||||
@@ -315,7 +315,7 @@ public class ZipkinSpanListenerTests {
|
||||
span.logEvent("cr");
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
then(result.kind()).isEqualTo(zipkin2.Span.Kind.CLIENT);
|
||||
then(result.annotations()).isEmpty();
|
||||
@@ -327,7 +327,7 @@ public class ZipkinSpanListenerTests {
|
||||
span.logEvent("ms");
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
then(result.kind()).isEqualTo(zipkin2.Span.Kind.PRODUCER);
|
||||
then(result.annotations()).isEmpty();
|
||||
@@ -339,7 +339,7 @@ public class ZipkinSpanListenerTests {
|
||||
span.logEvent("mr");
|
||||
span.stop();
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
then(result.kind()).isEqualTo(zipkin2.Span.Kind.CONSUMER);
|
||||
then(result.annotations()).isEmpty();
|
||||
@@ -364,7 +364,7 @@ public class ZipkinSpanListenerTests {
|
||||
}
|
||||
// end::service_name[]
|
||||
|
||||
zipkin2.Span result = this.spanListener.convert(span);
|
||||
zipkin2.Span result = this.spanReporter.convert(span);
|
||||
|
||||
then(result.remoteEndpoint())
|
||||
.isEqualTo(Endpoint.newBuilder().serviceName("redis").ip("1.2.3.4").port(1234).build());
|
||||
Reference in New Issue
Block a user