diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessor.java new file mode 100644 index 000000000..b94340317 --- /dev/null +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessor.java @@ -0,0 +1,151 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.autoconfig; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.env.EnvironmentPostProcessor; +import org.springframework.cloud.sleuth.instrument.messaging.TraceMessageHeaders; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; +import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.PropertySource; +import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.core.io.Resource; +import org.springframework.core.io.ResourceLoader; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; +import org.springframework.core.io.support.PropertiesLoaderUtils; + +/** + * {@link EnvironmentPostProcessor} that sets the default properties for + * Sleuth Stream. Since the sleuth-stream module gets deprecated, we've + * copied the module to core with small adjustments to ensure that the + * tracing headers are copied. That's required for the Kafka binder. + * + * @author Dave Syer + * @since 1.3.5 + */ +class TraceStreamEnvironmentPostProcessor implements EnvironmentPostProcessor { + + private static final String PROPERTY_SOURCE_NAME = "defaultProperties"; + static final String[] HEADERS = new String[] { TraceMessageHeaders.SPAN_ID_NAME, + TraceMessageHeaders.TRACE_ID_NAME, TraceMessageHeaders.PARENT_ID_NAME, TraceMessageHeaders.PROCESS_ID_NAME, + TraceMessageHeaders.SAMPLED_NAME, TraceMessageHeaders.SPAN_NAME_NAME }; + + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, + SpringApplication application) { + Map map = new HashMap(); + ResourceLoader resourceLoader = application.getResourceLoader(); + resourceLoader = resourceLoader == null ? new DefaultResourceLoader() + : resourceLoader; + PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver( + resourceLoader); + try { + for (Resource resource : getAllSpringBinders(resolver)) { + for (String binderType : parseBinderConfigurations(resource)) { + int startIndex = findStartIndex(environment, binderType); + addHeaders(map, environment.getPropertySources(), binderType, startIndex); + } + } + } + catch (IOException e) { + throw new IllegalStateException("Cannot load META-INF/spring.binders", e); + } + addOrReplace(environment.getPropertySources(), map); + } + + Resource[] getAllSpringBinders(PathMatchingResourcePatternResolver resolver) + throws IOException { + return resolver + .getResources("classpath*:META-INF/spring.binders"); + } + + private int findStartIndex(ConfigurableEnvironment environment, String binder) { + String prefix = "spring.cloud.stream." + binder + ".binder.HEADERS"; + int i = 0; + String oldHeaders = environment.getProperty(prefix); + if (oldHeaders != null) { + i = oldHeaders.split(",").length; + } + while (environment.getProperty(prefix + "[" + i + "]") != null) { + i++; + } + return i; + } + + Collection parseBinderConfigurations(Resource resource) { + Collection keys = new HashSet<>(); + try { + Properties props = PropertiesLoaderUtils.loadProperties(resource); + for (Object object : props.keySet()) { + keys.add(object.toString()); + } + } + catch (IOException e) { + } + return keys; + } + + private void addOrReplace(MutablePropertySources propertySources, + Map map) { + MapPropertySource target = null; + if (propertySources.contains(PROPERTY_SOURCE_NAME)) { + PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); + if (source instanceof MapPropertySource) { + target = (MapPropertySource) source; + for (String key : map.keySet()) { + if (!target.containsProperty(key)) { + target.getSource().put(key, map.get(key)); + } + } + } + } + if (target == null) { + target = new MapPropertySource(PROPERTY_SOURCE_NAME, map); + } + if (!propertySources.contains(PROPERTY_SOURCE_NAME)) { + propertySources.addLast(target); + } + } + + private void addHeaders(Map map, MutablePropertySources propertySources, + String binder, int startIndex) { + String stem = "spring.cloud.stream." + binder + ".binder.HEADERS"; + for (int i = 0; i < HEADERS.length; i++) { + if (!hasTracingHeadersValue(propertySources, HEADERS[i])) { + map.put(stem + "[" + (i + startIndex) + "]", HEADERS[i]); + } + } + } + + private boolean hasTracingHeadersValue(MutablePropertySources propertySources, String header) { + PropertySource source = propertySources.get(PROPERTY_SOURCE_NAME); + if (source instanceof MapPropertySource) { + Collection values = ((MapPropertySource) source).getSource().values(); + return values.contains(header); + } + return false; + } + +} diff --git a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories index 1c93d4d72..4af31b679 100644 --- a/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-sleuth-core/src/main/resources/META-INF/spring.factories @@ -21,4 +21,5 @@ org.springframework.cloud.sleuth.annotation.SleuthAnnotationAutoConfiguration # Environment Post Processor org.springframework.boot.env.EnvironmentPostProcessor=\ -org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor +org.springframework.cloud.sleuth.autoconfig.TraceEnvironmentPostProcessor,\ +org.springframework.cloud.sleuth.autoconfig.TraceStreamEnvironmentPostProcessor diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessorTests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessorTests.java new file mode 100644 index 000000000..f971804d3 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/autoconfig/TraceStreamEnvironmentPostProcessorTests.java @@ -0,0 +1,113 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.autoconfig; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import org.junit.Test; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.test.util.EnvironmentTestUtils; +import org.springframework.cloud.sleuth.instrument.messaging.TraceMessageHeaders; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.StandardEnvironment; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.support.PathMatchingResourcePatternResolver; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +public class TraceStreamEnvironmentPostProcessorTests { + + private TraceStreamEnvironmentPostProcessor processor = new TraceStreamEnvironmentPostProcessor() { + @Override Resource[] getAllSpringBinders( + PathMatchingResourcePatternResolver resolver) throws IOException { + return new Resource[] { new ClassPathResource("/META-INF/spring.factories") }; + } + + @Override Collection parseBinderConfigurations(Resource resource) { + return Collections.singleton("test"); + } + }; + private ConfigurableEnvironment environment = new StandardEnvironment(); + + @Test + public void should_append_tracing_headers() { + postProcess(); + assertThat(this.environment + .getProperty("spring.cloud.stream.test.binder.HEADERS[0]")) + .isEqualTo(TraceMessageHeaders.SPAN_ID_NAME); + } + + @Test + public void should_append_tracing_headers_to_existing_ones() { + EnvironmentTestUtils.addEnvironment(this.environment, + "spring.cloud.stream.test.binder.HEADERS[0]=X-Custom", + "spring.cloud.stream.test.binder.HEADERS[1]=X-Mine"); + postProcess(); + assertThat(this.environment + .getProperty("spring.cloud.stream.test.binder.HEADERS[2]")) + .isEqualTo(TraceMessageHeaders.SPAN_ID_NAME); + } + + @Test + public void should_append_tracing_headers_to_existing_ones_in_single_line() { + EnvironmentTestUtils.addEnvironment(this.environment, + "spring.cloud.stream.test.binder.HEADERS=foo,bar"); + postProcess(); + assertThat(this.environment + .getProperty("spring.cloud.stream.test.binder.HEADERS[2]")) + .isEqualTo(TraceMessageHeaders.SPAN_ID_NAME); + } + + @Test + public void should_not_append_tracing_headers_if_they_are_already_appended() { + postProcess(); + postProcess(); + postProcess(); + + Collection headerValues = defaultPropertiesSource().values(); + + Collection traceIds = headerValues.stream() + .filter(input -> input.contains(TraceMessageHeaders.TRACE_ID_NAME)) + .collect(Collectors.toList()); + assertThat(traceIds).hasSize(1); + assertThat(defaultPropertiesSource().keySet().stream() + .filter(input -> input + .startsWith("spring.cloud.stream.test.binder.HEADERS")) + .collect(Collectors.toList())) + .hasSize(TraceStreamEnvironmentPostProcessor.HEADERS.length); + } + + private void postProcess() { + this.processor.postProcessEnvironment(this.environment, + new SpringApplication(TraceStreamEnvironmentPostProcessor.class)); + } + + private Map defaultPropertiesSource() { + return (Map) this.environment.getPropertySources() + .get("defaultProperties").getSource(); + } + +}