ADded mechanism to fallback from the deprecated sleuth-stream module

without this change the kafka binder wasn't able to propagate tracing headers without explicitly adding on classpath the deprecated sleuth-stream module
with this change we're adding the stream support in core of Sleuth

fixes gh-1005
This commit is contained in:
Marcin Grzejszczak
2018-06-11 15:18:43 +02:00
parent 503e7fe917
commit 6d5a8a60f1
3 changed files with 266 additions and 1 deletions

View File

@@ -0,0 +1,151 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.autoconfig;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.cloud.sleuth.instrument.messaging.TraceMessageHeaders;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.PropertySource;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.PropertiesLoaderUtils;
/**
* {@link EnvironmentPostProcessor} that sets the default properties for
* Sleuth Stream. Since the sleuth-stream module gets deprecated, we've
* copied the module to core with small adjustments to ensure that the
* tracing headers are copied. That's required for the Kafka binder.
*
* @author Dave Syer
* @since 1.3.5
*/
class TraceStreamEnvironmentPostProcessor implements EnvironmentPostProcessor {
private static final String PROPERTY_SOURCE_NAME = "defaultProperties";
static final String[] HEADERS = new String[] { TraceMessageHeaders.SPAN_ID_NAME,
TraceMessageHeaders.TRACE_ID_NAME, TraceMessageHeaders.PARENT_ID_NAME, TraceMessageHeaders.PROCESS_ID_NAME,
TraceMessageHeaders.SAMPLED_NAME, TraceMessageHeaders.SPAN_NAME_NAME };
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment,
SpringApplication application) {
Map<String, Object> map = new HashMap<String, Object>();
ResourceLoader resourceLoader = application.getResourceLoader();
resourceLoader = resourceLoader == null ? new DefaultResourceLoader()
: resourceLoader;
PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(
resourceLoader);
try {
for (Resource resource : getAllSpringBinders(resolver)) {
for (String binderType : parseBinderConfigurations(resource)) {
int startIndex = findStartIndex(environment, binderType);
addHeaders(map, environment.getPropertySources(), binderType, startIndex);
}
}
}
catch (IOException e) {
throw new IllegalStateException("Cannot load META-INF/spring.binders", e);
}
addOrReplace(environment.getPropertySources(), map);
}
Resource[] getAllSpringBinders(PathMatchingResourcePatternResolver resolver)
throws IOException {
return resolver
.getResources("classpath*:META-INF/spring.binders");
}
private int findStartIndex(ConfigurableEnvironment environment, String binder) {
String prefix = "spring.cloud.stream." + binder + ".binder.HEADERS";
int i = 0;
String oldHeaders = environment.getProperty(prefix);
if (oldHeaders != null) {
i = oldHeaders.split(",").length;
}
while (environment.getProperty(prefix + "[" + i + "]") != null) {
i++;
}
return i;
}
Collection<String> parseBinderConfigurations(Resource resource) {
Collection<String> keys = new HashSet<>();
try {
Properties props = PropertiesLoaderUtils.loadProperties(resource);
for (Object object : props.keySet()) {
keys.add(object.toString());
}
}
catch (IOException e) {
}
return keys;
}
private void addOrReplace(MutablePropertySources propertySources,
Map<String, Object> map) {
MapPropertySource target = null;
if (propertySources.contains(PROPERTY_SOURCE_NAME)) {
PropertySource<?> source = propertySources.get(PROPERTY_SOURCE_NAME);
if (source instanceof MapPropertySource) {
target = (MapPropertySource) source;
for (String key : map.keySet()) {
if (!target.containsProperty(key)) {
target.getSource().put(key, map.get(key));
}
}
}
}
if (target == null) {
target = new MapPropertySource(PROPERTY_SOURCE_NAME, map);
}
if (!propertySources.contains(PROPERTY_SOURCE_NAME)) {
propertySources.addLast(target);
}
}
private void addHeaders(Map<String, Object> map, MutablePropertySources propertySources,
String binder, int startIndex) {
String stem = "spring.cloud.stream." + binder + ".binder.HEADERS";
for (int i = 0; i < HEADERS.length; i++) {
if (!hasTracingHeadersValue(propertySources, HEADERS[i])) {
map.put(stem + "[" + (i + startIndex) + "]", HEADERS[i]);
}
}
}
private boolean hasTracingHeadersValue(MutablePropertySources propertySources, String header) {
PropertySource<?> source = propertySources.get(PROPERTY_SOURCE_NAME);
if (source instanceof MapPropertySource) {
Collection<Object> values = ((MapPropertySource) source).getSource().values();
return values.contains(header);
}
return false;
}
}

View File

@@ -21,4 +21,5 @@ org.springframework.cloud.sleuth.annotation.SleuthAnnotationAutoConfiguration
# Environment Post Processor
org.springframework.boot.env.EnvironmentPostProcessor=\
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor
org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor,\
org.springframework.cloud.sleuth.autoconfig.TraceStreamEnvironmentPostProcessor

View File

@@ -0,0 +1,113 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.autoconfig;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.test.util.EnvironmentTestUtils;
import org.springframework.cloud.sleuth.instrument.messaging.TraceMessageHeaders;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Dave Syer
*
*/
public class TraceStreamEnvironmentPostProcessorTests {
private TraceStreamEnvironmentPostProcessor processor = new TraceStreamEnvironmentPostProcessor() {
@Override Resource[] getAllSpringBinders(
PathMatchingResourcePatternResolver resolver) throws IOException {
return new Resource[] { new ClassPathResource("/META-INF/spring.factories") };
}
@Override Collection<String> parseBinderConfigurations(Resource resource) {
return Collections.singleton("test");
}
};
private ConfigurableEnvironment environment = new StandardEnvironment();
@Test
public void should_append_tracing_headers() {
postProcess();
assertThat(this.environment
.getProperty("spring.cloud.stream.test.binder.HEADERS[0]"))
.isEqualTo(TraceMessageHeaders.SPAN_ID_NAME);
}
@Test
public void should_append_tracing_headers_to_existing_ones() {
EnvironmentTestUtils.addEnvironment(this.environment,
"spring.cloud.stream.test.binder.HEADERS[0]=X-Custom",
"spring.cloud.stream.test.binder.HEADERS[1]=X-Mine");
postProcess();
assertThat(this.environment
.getProperty("spring.cloud.stream.test.binder.HEADERS[2]"))
.isEqualTo(TraceMessageHeaders.SPAN_ID_NAME);
}
@Test
public void should_append_tracing_headers_to_existing_ones_in_single_line() {
EnvironmentTestUtils.addEnvironment(this.environment,
"spring.cloud.stream.test.binder.HEADERS=foo,bar");
postProcess();
assertThat(this.environment
.getProperty("spring.cloud.stream.test.binder.HEADERS[2]"))
.isEqualTo(TraceMessageHeaders.SPAN_ID_NAME);
}
@Test
public void should_not_append_tracing_headers_if_they_are_already_appended() {
postProcess();
postProcess();
postProcess();
Collection<String> headerValues = defaultPropertiesSource().values();
Collection<String> traceIds = headerValues.stream()
.filter(input -> input.contains(TraceMessageHeaders.TRACE_ID_NAME))
.collect(Collectors.toList());
assertThat(traceIds).hasSize(1);
assertThat(defaultPropertiesSource().keySet().stream()
.filter(input -> input
.startsWith("spring.cloud.stream.test.binder.HEADERS"))
.collect(Collectors.toList()))
.hasSize(TraceStreamEnvironmentPostProcessor.HEADERS.length);
}
private void postProcess() {
this.processor.postProcessEnvironment(this.environment,
new SpringApplication(TraceStreamEnvironmentPostProcessor.class));
}
private Map<String, String> defaultPropertiesSource() {
return (Map<String, String>) this.environment.getPropertySources()
.get("defaultProperties").getSource();
}
}