Reuses the span from a message only for non direct channels; fixes gh-943 (#945)

This commit is contained in:
Marcin Grzejszczak
2018-04-16 10:37:58 +02:00
committed by GitHub
parent baaade76db
commit 87ed7b8291
8 changed files with 433 additions and 6 deletions

View File

@@ -43,6 +43,8 @@ import org.springframework.messaging.support.MessageHeaderAccessor;
*/
public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
private static final String ASYNC_COMPONENT = "async";
private static final org.apache.commons.logging.Log log = LogFactory
.getLog(TraceChannelInterceptor.class);
@@ -61,8 +63,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
Message<?> retrievedMessage = getMessage(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);
Span currentSpan = getTracer().isTracing() ? getTracer().getCurrentSpan()
: buildSpan(new MessagingTextMap(messageBuilder));
Span currentSpan = currentSpanOrFromHeaders(messageBuilder);
if (log.isDebugEnabled()) {
log.debug("Completed sending and current span is " + currentSpan);
}
@@ -83,6 +84,36 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
}
private Span extractSpanOrPickCurrent(MessageBuilder<?> messageBuilder,
MessageChannel channel) {
if (isDirectChannel(channel)) {
return currentSpanOrFromHeaders(messageBuilder);
}
Span spanFromMessage = spanFromHeaders(messageBuilder);
Span currentSpan = getTracer().getCurrentSpan();
if (currentSpan != null && currentSpan.equals(spanFromMessage)) {
return currentSpan;
} else if (spanComesFromAsync(currentSpan)) {
getTracer().detach(currentSpan);
getTracer().continueSpan(spanFromMessage);
}
return spanFromMessage;
}
private boolean spanComesFromAsync(Span currentSpan) {
return currentSpan != null && currentSpan.getSpanId() == currentSpan.getTraceId()
&& currentSpan.getName().equals(ASYNC_COMPONENT);
}
private Span currentSpanOrFromHeaders(MessageBuilder<?> messageBuilder) {
return getTracer().isTracing() ? getTracer().getCurrentSpan()
: spanFromHeaders(messageBuilder);
}
private Span spanFromHeaders(MessageBuilder<?> messageBuilder) {
return buildSpan(new MessagingTextMap(messageBuilder));
}
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (emptyMessage(message)) {
@@ -93,8 +124,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
Message<?> retrievedMessage = getMessage(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);
Span parentSpan = getTracer().isTracing() ? getTracer().getCurrentSpan()
: buildSpan(new MessagingTextMap(messageBuilder));
Span parentSpan = currentSpanOrFromHeaders(messageBuilder);
// Do not continue the parent (assume that this is handled by caller)
// getTracer().continueSpan(parentSpan);
if (log.isDebugEnabled()) {
@@ -184,8 +214,7 @@ public class TraceChannelInterceptor extends AbstractTraceChannelInterceptor {
}
Message<?> retrievedMessage = getMessage(message);
MessageBuilder<?> messageBuilder = MessageBuilder.fromMessage(retrievedMessage);
Span spanFromHeader = getTracer().isTracing() ? getTracer().getCurrentSpan()
: buildSpan(new MessagingTextMap(messageBuilder));
Span spanFromHeader = extractSpanOrPickCurrent(messageBuilder, channel);
if (log.isDebugEnabled()) {
log.debug("Continuing span " + spanFromHeader + " before handling message");
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
import java.util.concurrent.Executor;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.sleuth.instrument.async.LazyTraceExecutor;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@EnableAsync
@Configuration
public class CustomExecutorConfig extends AsyncConfigurerSupport {
@Autowired BeanFactory beanFactory;
@Override public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// CUSTOMIZE HERE
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
// DON'T FORGET TO INITIALIZE
executor.initialize();
return new LazyTraceExecutor(this.beanFactory, executor);
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration;
import org.springframework.cloud.sleuth.sampler.AlwaysSampler;
import org.springframework.cloud.sleuth.util.ArrayListSpanAccumulator;
import org.springframework.cloud.sleuth.util.ExceptionUtils;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class,HibernateJpaAutoConfiguration.class})
@ImportResource("classpath:beans/applicationContext.xml")
@EnableIntegration
@EnableAsync
public class HelloSpringIntegration {
public static void main(String[] args) {
ExceptionUtils.setFail(true);
SpringApplication.run(HelloSpringIntegration.class, args);
}
@Bean AlwaysSampler sampler() {
return new AlwaysSampler();
}
@Bean RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean ArrayListSpanAccumulator accumulator() {
return new ArrayListSpanAccumulator();
}
}

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloWorldImpl {
private static final Logger LOG = LoggerFactory.getLogger(HelloWorldImpl.class);
public String invokeProcessor(String message) throws InterruptedException {
LOG.info(" input message "+message);
Thread.currentThread().sleep(500);
LOG.info(" After the Sleep "+message);
String responseMessage = message + " Persist into DB ";
return responseMessage;
}
public List<String> aggregate(List<String> requestMessage) {
LOG.info(Thread.currentThread().getName());
LOG.info(" requestMessage aggregate "+requestMessage);
return requestMessage;
}
public List<String> splitMessage(String[] splitRequest){
LOG.info(" Inside splitMessage " +splitRequest);
List<String> splitGBSResponse = new ArrayList<String>();
splitGBSResponse = Arrays.asList(splitRequest);
return splitGBSResponse;
}
}

View File

@@ -0,0 +1,68 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class HelloWorldRestController {
private static final Logger LOG = LoggerFactory.getLogger(HelloWorldRestController.class);
@Autowired
private ApplicationContext applicationContext;
@RequestMapping(path = "getHelloWorldMessage", method = RequestMethod.GET, produces = MediaType.TEXT_PLAIN_VALUE)
public ResponseEntity<String> getHelloWorld() throws Exception {
LOG.info("Inside getHelloWorldMessage");
String[] requestMessage = new String[3];
requestMessage[0] = "Hellow World Message 1";
requestMessage[1] = "Hellow World Message 2";
requestMessage[2] = "Hellow World Message 3";
PollableChannel outputChannel = (PollableChannel) applicationContext.getBean("messagingOutputChannel");
MessagingGateway messagingGateway = (MessagingGateway) applicationContext
.getBean("messagingGateway");
messagingGateway.processMessage(requestMessage);
GenericMessage reply = (GenericMessage) outputChannel.receive();
List<String> body = (List<String>) reply.getPayload();
LOG.info(" Response Message " + body);
return new ResponseEntity<String>(body.toString(), HttpStatus.OK);
}
}

View File

@@ -0,0 +1,70 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.trace.TestSpanContextHolder;
import org.springframework.cloud.sleuth.util.ArrayListSpanAccumulator;
import org.springframework.cloud.sleuth.util.ExceptionUtils;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.web.client.RestTemplate;
import static org.assertj.core.api.BDDAssertions.then;
/**
* Example taken from https://github.com/spring-cloud/spring-cloud-sleuth/issues/943
*/
public class Issue943Tests {
@Before
public void setup() {
TestSpanContextHolder.removeCurrentSpan();
}
@Test
public void should_start_context() {
try (ConfigurableApplicationContext applicationContext = SpringApplication
.run(HelloSpringIntegration.class, "--spring.jmx.enabled=false", "--server.port=0")) {
Span newSpan = applicationContext.getBean(Tracer.class).createSpan("foo");
RestTemplate restTemplate = applicationContext.getBean(RestTemplate.class);
String object = restTemplate.getForObject(
"http://localhost:" + applicationContext.getEnvironment()
.getProperty("local.server.port") + "/getHelloWorldMessage",
String.class);
ArrayListSpanAccumulator accumulator = applicationContext.getBean(ArrayListSpanAccumulator.class);
then(object)
.contains("Hellow World Message 1 Persist into DB")
.contains("Hellow World Message 2 Persist into DB")
.contains("Hellow World Message 3 Persist into DB");
then(accumulator.getSpans().stream()
.filter(span -> span.getTraceId() == newSpan.getTraceId())
.collect(Collectors.toList())).isNotEmpty()
.extracting("name")
.contains("message:splitterOutChannel", "message:messagingChannel",
"message:messagingProcessedChannel", "message:messagingOutputChannel",
"http:/getHelloWorldMessage");
}
then(ExceptionUtils.getLastException()).isNull();
}
}

View File

@@ -0,0 +1,23 @@
/*
* Copyright 2013-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943;
public interface MessagingGateway {
void processMessage(String[] messageArray);
}

View File

@@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2013-2018 the original author or authors.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:task="http://www.springframework.org/schema/task" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
<int:channel id="messagingChannel" />
<int:channel id="splitterOutChannel" >
<int:queue capacity="10" />
</int:channel>
<int:channel id="messagingProcessedChannel">
<int:queue capacity="10" />
</int:channel>
<int:channel id="messagingOutputChannel">
<int:queue capacity="10" />
</int:channel>
<int:service-activator input-channel="splitterOutChannel"
method="invokeProcessor" output-channel="messagingProcessedChannel"
ref="helloWorldImpl">
<int:poller max-messages-per-poll="1" fixed-rate="1"
task-executor="requestExecutor" />
</int:service-activator>
<int:aggregator input-channel="messagingProcessedChannel"
output-channel="messagingOutputChannel"
message-store="messageStore"
send-partial-result-on-expiry="true">
<int:poller max-messages-per-poll="1" fixed-rate="1"
task-executor="requestExecutor" />
</int:aggregator>
<!-- Define a store for our search results and set up a reaper that will
periodically expire those results. -->
<bean id="messageStore"
class="org.springframework.integration.store.SimpleMessageStore" />
<bean id="messageStoreReaper"
class="org.springframework.integration.store.MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore" />
<property name="timeout" value="2000" />
</bean>
<int:gateway id="messagingGateway"
service-interface="org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943.MessagingGateway"
default-request-channel="messagingChannel" default-reply-timeout="10000"
default-reply-channel="splitterOutChannel" />
<task:executor id="requestExecutor" pool-size="10"/>
<int:splitter ref="helloWorldImpl" method="splitMessage"
input-channel="messagingChannel" output-channel="splitterOutChannel" />
<bean id="helloWorldImpl" class="org.springframework.cloud.sleuth.instrument.messaging.issues.issue_943.HelloWorldImpl" />
</beans>