From 87ed7b8291e8f6be144fa81cf6bd6c8f701f0ecf Mon Sep 17 00:00:00 2001 From: Marcin Grzejszczak Date: Mon, 16 Apr 2018 10:37:58 +0200 Subject: [PATCH] Reuses the span from a message only for non direct channels; fixes gh-943 (#945) --- .../messaging/TraceChannelInterceptor.java | 41 +++++++-- .../issue_943/CustomExecutorConfig.java | 46 ++++++++++ .../issue_943/HelloSpringIntegration.java | 56 +++++++++++++ .../issues/issue_943/HelloWorldImpl.java | 52 ++++++++++++ .../issue_943/HelloWorldRestController.java | 68 +++++++++++++++ .../issues/issue_943/Issue943Tests.java | 70 ++++++++++++++++ .../issues/issue_943/MessagingGateway.java | 23 +++++ .../resources/beans/applicationContext.xml | 83 +++++++++++++++++++ 8 files changed, 433 insertions(+), 6 deletions(-) create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/CustomExecutorConfig.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloSpringIntegration.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldImpl.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldRestController.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/Issue943Tests.java create mode 100644 spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/MessagingGateway.java create mode 100644 spring-cloud-sleuth-core/src/test/resources/beans/applicationContext.xml diff --git a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java index e0800969c..fb44297aa 100644 --- a/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java +++ b/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/messaging/TraceChannelInterceptor.java @@ -43,6 +43,8 @@ import org.springframework.messaging.support.MessageHeaderAccessor; */ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { + private static final String ASYNC_COMPONENT = "async"; + private static final org.apache.commons.logging.Log log = LogFactory .getLog(TraceChannelInterceptor.class); @@ -61,8 +63,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } Message retrievedMessage = getMessage(message); MessageBuilder messageBuilder = MessageBuilder.fromMessage(retrievedMessage); - Span currentSpan = getTracer().isTracing() ? getTracer().getCurrentSpan() - : buildSpan(new MessagingTextMap(messageBuilder)); + Span currentSpan = currentSpanOrFromHeaders(messageBuilder); if (log.isDebugEnabled()) { log.debug("Completed sending and current span is " + currentSpan); } @@ -83,6 +84,36 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } } + private Span extractSpanOrPickCurrent(MessageBuilder messageBuilder, + MessageChannel channel) { + if (isDirectChannel(channel)) { + return currentSpanOrFromHeaders(messageBuilder); + } + Span spanFromMessage = spanFromHeaders(messageBuilder); + Span currentSpan = getTracer().getCurrentSpan(); + if (currentSpan != null && currentSpan.equals(spanFromMessage)) { + return currentSpan; + } else if (spanComesFromAsync(currentSpan)) { + getTracer().detach(currentSpan); + getTracer().continueSpan(spanFromMessage); + } + return spanFromMessage; + } + + private boolean spanComesFromAsync(Span currentSpan) { + return currentSpan != null && currentSpan.getSpanId() == currentSpan.getTraceId() + && currentSpan.getName().equals(ASYNC_COMPONENT); + } + + private Span currentSpanOrFromHeaders(MessageBuilder messageBuilder) { + return getTracer().isTracing() ? getTracer().getCurrentSpan() + : spanFromHeaders(messageBuilder); + } + + private Span spanFromHeaders(MessageBuilder messageBuilder) { + return buildSpan(new MessagingTextMap(messageBuilder)); + } + @Override public Message preSend(Message message, MessageChannel channel) { if (emptyMessage(message)) { @@ -93,8 +124,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } Message retrievedMessage = getMessage(message); MessageBuilder messageBuilder = MessageBuilder.fromMessage(retrievedMessage); - Span parentSpan = getTracer().isTracing() ? getTracer().getCurrentSpan() - : buildSpan(new MessagingTextMap(messageBuilder)); + Span parentSpan = currentSpanOrFromHeaders(messageBuilder); // Do not continue the parent (assume that this is handled by caller) // getTracer().continueSpan(parentSpan); if (log.isDebugEnabled()) { @@ -184,8 +214,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor { } Message retrievedMessage = getMessage(message); MessageBuilder messageBuilder = MessageBuilder.fromMessage(retrievedMessage); - Span spanFromHeader = getTracer().isTracing() ? getTracer().getCurrentSpan() - : buildSpan(new MessagingTextMap(messageBuilder)); + Span spanFromHeader = extractSpanOrPickCurrent(messageBuilder, channel); if (log.isDebugEnabled()) { log.debug("Continuing span " + spanFromHeader + " before handling message"); } diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/CustomExecutorConfig.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/CustomExecutorConfig.java new file mode 100644 index 000000000..2961b43d8 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/CustomExecutorConfig.java @@ -0,0 +1,46 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +import java.util.concurrent.Executor; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.sleuth.instrument.async.LazyTraceExecutor; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurerSupport; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@EnableAsync +@Configuration +public class CustomExecutorConfig extends AsyncConfigurerSupport { + + @Autowired BeanFactory beanFactory; + + @Override public Executor getAsyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // CUSTOMIZE HERE + executor.setCorePoolSize(7); + executor.setMaxPoolSize(42); + executor.setQueueCapacity(11); + executor.setThreadNamePrefix("MyExecutor-"); + // DON'T FORGET TO INITIALIZE + executor.initialize(); + return new LazyTraceExecutor(this.beanFactory, executor); + } +} \ No newline at end of file diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloSpringIntegration.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloSpringIntegration.java new file mode 100644 index 000000000..d4b7c621a --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloSpringIntegration.java @@ -0,0 +1,56 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; +import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; +import org.springframework.cloud.sleuth.sampler.AlwaysSampler; +import org.springframework.cloud.sleuth.util.ArrayListSpanAccumulator; +import org.springframework.cloud.sleuth.util.ExceptionUtils; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ImportResource; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.web.client.RestTemplate; + +@SpringBootApplication +@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,HibernateJpaAutoConfiguration.class}) +@ImportResource("classpath:beans/applicationContext.xml") +@EnableIntegration +@EnableAsync +public class HelloSpringIntegration { + + public static void main(String[] args) { + ExceptionUtils.setFail(true); + SpringApplication.run(HelloSpringIntegration.class, args); + } + + @Bean AlwaysSampler sampler() { + return new AlwaysSampler(); + } + + @Bean RestTemplate restTemplate() { + return new RestTemplate(); + } + + @Bean ArrayListSpanAccumulator accumulator() { + return new ArrayListSpanAccumulator(); + } +} diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldImpl.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldImpl.java new file mode 100644 index 000000000..bacdfb7a7 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldImpl.java @@ -0,0 +1,52 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HelloWorldImpl { + + private static final Logger LOG = LoggerFactory.getLogger(HelloWorldImpl.class); + + public String invokeProcessor(String message) throws InterruptedException { + LOG.info(" input message "+message); + Thread.currentThread().sleep(500); + LOG.info(" After the Sleep "+message); + String responseMessage = message + " Persist into DB "; + return responseMessage; + } + + + public List aggregate(List requestMessage) { + LOG.info(Thread.currentThread().getName()); + LOG.info(" requestMessage aggregate "+requestMessage); + return requestMessage; + } + + + public List splitMessage(String[] splitRequest){ + LOG.info(" Inside splitMessage " +splitRequest); + List splitGBSResponse = new ArrayList(); + splitGBSResponse = Arrays.asList(splitRequest); + return splitGBSResponse; + } +} diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldRestController.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldRestController.java new file mode 100644 index 000000000..bff4c3568 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/HelloWorldRestController.java @@ -0,0 +1,68 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.messaging.PollableChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +@RestController +public class HelloWorldRestController { + + private static final Logger LOG = LoggerFactory.getLogger(HelloWorldRestController.class); + + @Autowired + private ApplicationContext applicationContext; + + @RequestMapping(path = "getHelloWorldMessage", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getHelloWorld() throws Exception { + + LOG.info("Inside getHelloWorldMessage"); + + String[] requestMessage = new String[3]; + requestMessage[0] = "Hellow World Message 1"; + requestMessage[1] = "Hellow World Message 2"; + requestMessage[2] = "Hellow World Message 3"; + + PollableChannel outputChannel = (PollableChannel) applicationContext.getBean("messagingOutputChannel"); + + MessagingGateway messagingGateway = (MessagingGateway) applicationContext + .getBean("messagingGateway"); + + messagingGateway.processMessage(requestMessage); + + GenericMessage reply = (GenericMessage) outputChannel.receive(); + + List body = (List) reply.getPayload(); + + LOG.info(" Response Message " + body); + + return new ResponseEntity(body.toString(), HttpStatus.OK); + } + +} diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/Issue943Tests.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/Issue943Tests.java new file mode 100644 index 000000000..5cee0e955 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/Issue943Tests.java @@ -0,0 +1,70 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +import java.util.stream.Collectors; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.boot.SpringApplication; +import org.springframework.cloud.sleuth.Span; +import org.springframework.cloud.sleuth.Tracer; +import org.springframework.cloud.sleuth.trace.TestSpanContextHolder; +import org.springframework.cloud.sleuth.util.ArrayListSpanAccumulator; +import org.springframework.cloud.sleuth.util.ExceptionUtils; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.web.client.RestTemplate; + +import static org.assertj.core.api.BDDAssertions.then; + +/** + * Example taken from https://github.com/spring-cloud/spring-cloud-sleuth/issues/943 + */ +public class Issue943Tests { + + @Before + public void setup() { + TestSpanContextHolder.removeCurrentSpan(); + } + + @Test + public void should_start_context() { + try (ConfigurableApplicationContext applicationContext = SpringApplication + .run(HelloSpringIntegration.class, "--spring.jmx.enabled=false", "--server.port=0")) { + Span newSpan = applicationContext.getBean(Tracer.class).createSpan("foo"); + RestTemplate restTemplate = applicationContext.getBean(RestTemplate.class); + String object = restTemplate.getForObject( + "http://localhost:" + applicationContext.getEnvironment() + .getProperty("local.server.port") + "/getHelloWorldMessage", + String.class); + ArrayListSpanAccumulator accumulator = applicationContext.getBean(ArrayListSpanAccumulator.class); + + then(object) + .contains("Hellow World Message 1 Persist into DB") + .contains("Hellow World Message 2 Persist into DB") + .contains("Hellow World Message 3 Persist into DB"); + then(accumulator.getSpans().stream() + .filter(span -> span.getTraceId() == newSpan.getTraceId()) + .collect(Collectors.toList())).isNotEmpty() + .extracting("name") + .contains("message:splitterOutChannel", "message:messagingChannel", + "message:messagingProcessedChannel", "message:messagingOutputChannel", + "http:/getHelloWorldMessage"); + } + then(ExceptionUtils.getLastException()).isNull(); + } +} diff --git a/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/MessagingGateway.java b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/MessagingGateway.java new file mode 100644 index 000000000..3fbceca56 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/java/org/springframework/cloud/sleuth/instrument/messaging/issues/issue_943/MessagingGateway.java @@ -0,0 +1,23 @@ +/* + * Copyright 2013-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943; + +public interface MessagingGateway { + + void processMessage(String[] messageArray); + +} diff --git a/spring-cloud-sleuth-core/src/test/resources/beans/applicationContext.xml b/spring-cloud-sleuth-core/src/test/resources/beans/applicationContext.xml new file mode 100644 index 000000000..ab148c220 --- /dev/null +++ b/spring-cloud-sleuth-core/src/test/resources/beans/applicationContext.xml @@ -0,0 +1,83 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file