Remove io.micrometer.observation code

This commit is contained in:
spencergibb
2022-03-31 13:05:24 -04:00
parent 2606a317c2
commit 830a7e7cc2
10 changed files with 2 additions and 1036 deletions

View File

@@ -17,8 +17,8 @@
<properties>
<avro.version>1.10.2</avro.version>
<micrometer-bom.version>2.0.0-SNAPSHOT</micrometer-bom.version>
<micrometer-tracing-bom.version>1.0.0-SNAPSHOT</micrometer-tracing-bom.version>
<micrometer-bom.version>2.0.0-M3</micrometer-bom.version>
<micrometer-tracing-bom.version>1.0.0-M3</micrometer-tracing-bom.version>
</properties>
<dependencies>
@@ -135,11 +135,6 @@
</dependency>
<!-- Observability -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation</artifactId>
<!-- <optional>true</optional> -->
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>

View File

@@ -1,26 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import io.micrometer.common.Tags;
public class DefaultFunctionTagsProvider implements FunctionTagsProvider {
@Override
public Tags getLowCardinalityTags(FunctionContext context) {
return Tags.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.of(context.getTargetFunction().getFunctionDefinition()));
}
}

View File

@@ -1,94 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import io.micrometer.core.lang.Nullable;
import io.micrometer.observation.Observation;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
/**
* Context.
*
* @author Marcin Grzejszczak
* @since 4.0.0
*/
public class FunctionContext extends Observation.Context {
private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction;
private Object input;
private Object modifiedInput;
private Object output;
private Object modifiedOutput;
public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
this.targetFunction = targetFunction;
}
public FunctionContext withInput(Object input) {
this.input = input;
this.modifiedInput = input;
return this;
}
public FunctionContext withOutput(Object output) {
this.output = output;
this.modifiedOutput = output;
return this;
}
@Nullable
public Object getInput() {
return input;
}
public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() {
return targetFunction;
}
@Nullable
public Object getModifiedInput() {
return modifiedInput;
}
public void setModifiedInput(Object modifiedInput) {
this.modifiedInput = modifiedInput;
}
@Nullable
public Object getOutput() {
return output;
}
public void setOutput(Object output) {
this.output = output;
}
@Nullable
public Object getModifiedOutput() {
return modifiedOutput;
}
public void setModifiedOutput(Object modifiedOutput) {
this.modifiedOutput = modifiedOutput;
}
}

View File

@@ -1,66 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import io.micrometer.common.docs.TagKey;
import io.micrometer.observation.docs.DocumentedObservation;
/**
* @author Marcin Grzejszczak
* @author Oleg Zhurakousky
* @since 4.0.0
*/
enum FunctionObservation implements DocumentedObservation {
/**
* Observation created around a function execution.
*/
FUNCTION_OBSERVATION {
@Override
public String getName() {
return "spring.cloud.function";
}
@Override
public String getContextualName() {
return "function";
}
@Override
public TagKey[] getLowCardinalityTagKeys() {
return FunctionLowCardinalityTags.values();
}
@Override
public String getPrefix() {
return "spring.cloud.function";
}
};
enum FunctionLowCardinalityTags implements TagKey {
/**
* Name of the function.
*/
FUNCTION_NAME {
@Override
public String getKey() {
return "spring.cloud.function.name";
}
}
}
}

View File

@@ -1,32 +0,0 @@
/*
* Copyright 2006-2009 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import io.micrometer.observation.Observation;
/**
* {@link Observation.TagsProvider} for {@link FunctionContext}.
*
* @author Marcin Grzejszczak
*/
public interface FunctionTagsProvider extends Observation.TagsProvider<FunctionContext> {
@Override
default boolean supportsContext(Observation.Context context) {
return context instanceof FunctionContext;
}
}

View File

@@ -1,89 +0,0 @@
/*
* Copyright 2012-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
/**
* @author Marcin Grzejszczak
* @since 4.0.0
*/
public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper implements Observation.TagsProviderAware<FunctionTagsProvider> {
private static final Log log = LogFactory.getLog(ObservationFunctionAroundWrapper.class);
private final ObservationRegistry observationRegistry;
private FunctionTagsProvider tagsProvider = new DefaultFunctionTagsProvider();
public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry) {
this.observationRegistry = observationRegistry;
}
@Override
protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) {
return targetFunction.apply(message); // no instrumentation
}
else if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) {
return reactorStream((Publisher) message, targetFunction);
}
return nonReactorStream(message, targetFunction);
}
private Object reactorStream(Publisher message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
// TODO
return message;
}
private Object nonReactorStream(Object message,
SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
FunctionContext context = new FunctionContext(targetFunction).withInput(message);
Object invocationMessage = context.getModifiedInput();
Object result = Observation
.createNotStarted(FunctionObservation.FUNCTION_OBSERVATION.getName(), context, this.observationRegistry)
.contextualName(FunctionObservation.FUNCTION_OBSERVATION.getContextualName())
.tagsProvider(this.tagsProvider)
.observe(() -> {
Object r = message == null ? targetFunction.get() : targetFunction.apply(invocationMessage);
context.setOutput(r);
return r;
});
if (result == null) {
if (log.isDebugEnabled()) {
log.debug("Returned message is null - we have a consumer");
}
return null;
}
return context.getModifiedOutput();
}
@Override
public void setTagsProvider(FunctionTagsProvider functionTagsProvider) {
this.tagsProvider = functionTagsProvider;
}
}

View File

@@ -1,334 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability.tracing;
import io.micrometer.observation.Observation;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.handler.TracingObservationHandler;
import io.micrometer.tracing.propagation.Propagator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.catalog.observability.FunctionContext;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.StringUtils;
/**
* Function Tracing Observation Handler.
*
* @author Marcin Grzejszczak
* @author Oleg Zhurakousky
* @since 4.0.0
*/
public class FunctionTracingObservationHandler implements TracingObservationHandler<FunctionContext> {
private static final Log log = LogFactory.getLog(FunctionTracingObservationHandler.class);
/**
* Using the literal "broker" until we come up with a better solution.
*
* <p>
* If the message originated from a binder (consumer binding), there will be different
* headers present (e.g. "KafkaHeaders.RECEIVED_TOPIC" Vs.
* "AmqpHeaders.CONSUMER_QUEUE" (unless the application removes them before sending).
* These don't represent the broker, rather a queue, and in any case the heuristics
* are not great. At least we might be able to tell if this is rabbit or not (ex how
* spring-rabbit works). We need to think this through before making an api, possibly
* experimenting.
*
* <p>
* If the app is outbound only (producer), there's no indication of what type the
* destination broker is. This may hint at a non-manual solution being overwriting the
* remoteServiceName later, similar to how servlet instrumentation lazy set
* "http.route".
*/
private static final String REMOTE_SERVICE_NAME = "broker";
private final Tracer tracer;
private final Propagator propagator;
private final Propagator.Getter<MessageHeaderAccessor> getter;
private final Propagator.Setter<MessageHeaderAccessor> setter;
public FunctionTracingObservationHandler(Tracer tracer, Propagator propagator, MessageHeaderPropagatorGetter getter, MessageHeaderPropagatorSetter setter) {
this.tracer = tracer;
this.propagator = propagator;
this.getter = getter;
this.setter = setter;
}
@Override
public void onStart(FunctionContext context) {
Message<?> message = (Message<?>) context.getInput();
MessageAndSpans wrappedInputMessage = null;
SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction = context.getTargetFunction();
Span functionSpan = null;
if (message == null && targetFunction.isSupplier()) { // Supplier
if (log.isDebugEnabled()) {
log.debug("Creating a span for a supplier");
}
functionSpan = this.tracer.nextSpan().start();
}
else {
if (log.isDebugEnabled()) {
log.debug("Will retrieve the tracing headers from the message");
}
// This will create a handle span
wrappedInputMessage = wrapInputMessage(context, message);
if (log.isDebugEnabled()) {
log.debug("Wrapped input msg " + wrappedInputMessage);
}
functionSpan = wrappedInputMessage.childSpan;
}
context.put(MessageAndSpans.class, wrappedInputMessage);
// This is the function span
getTracingContext(context).setSpan(functionSpan);
}
@Override
public void onStop(FunctionContext context) {
MessageAndSpans invocationMessage = context.get(MessageAndSpans.class);
Span functionSpan = getRequiredSpan(context);
functionSpan.name(context.getTargetFunction().getFunctionDefinition()).end();
Object result = context.getOutput();
Message<?> msgResult = toMessage(result);
MessageAndSpan wrappedOutputMessage;
if (log.isDebugEnabled()) {
log.debug("Will instrument the output message");
}
if (invocationMessage != null) {
wrappedOutputMessage = wrapOutputMessage(msgResult, invocationMessage.parentSpan, context);
}
else {
wrappedOutputMessage = wrapOutputMessage(msgResult, functionSpan, context);
}
if (log.isDebugEnabled()) {
log.debug("Wrapped output msg " + wrappedOutputMessage);
}
wrappedOutputMessage.span.end();
context.setModifiedOutput(wrappedOutputMessage.msg);
}
// String inputDestination(String functionDefinition) {
// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> {
// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0";
// String bindingProperty = this.environment.containsProperty(bindingMappingProperty)
// ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0";
// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s);
// });
// }
//
// String outputDestination(String functionDefinition) {
// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> {
// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0";
// String bindingProperty = this.environment.containsProperty(bindingMappingProperty)
// ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0";
// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s);
// });
// }
private Message<?> toMessage(Object result) {
if (!(result instanceof Message)) {
return MessageBuilder.withPayload(result).build();
}
return (Message<?>) result;
}
/**
* Wraps the given input message with tracing headers and returns a corresponding
* span.
* @param message - message to wrap
* @return a tuple with the wrapped message and a corresponding span
*/
private MessageAndSpans wrapInputMessage(FunctionContext context, Message<?> message) {
MessageHeaderAccessor headers = mutableHeaderAccessor(message);
Span.Builder consumerSpanBuilder = this.propagator.extract(headers, this.getter);
Span handleSpan = consumerSpan(context, consumerSpanBuilder);
if (log.isDebugEnabled()) {
log.debug("Built a consumer span " + handleSpan);
}
Span functionSpan = tracer.nextSpan(handleSpan).name(context.getContextualName()).start();
clearTracingHeaders(headers);
if (message instanceof ErrorMessage) {
return new MessageAndSpans(new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()),
handleSpan, functionSpan);
}
headers.setImmutable();
return new MessageAndSpans(new GenericMessage<>(message.getPayload(), headers.getMessageHeaders()),
handleSpan, functionSpan);
}
// Handle span
private Span consumerSpan(FunctionContext context, Span.Builder consumerSpanBuilder) {
// TODO: Add this as a documented span
consumerSpanBuilder.kind(Span.Kind.CONSUMER).name("handle");
consumerSpanBuilder.remoteServiceName(REMOTE_SERVICE_NAME);
// this is the consumer part of the producer->consumer mechanism
Span consumerSpan = consumerSpanBuilder.start();
tagSpan(context, consumerSpan);
// we're ending this immediately just to have a properly nested graph
consumerSpan.end();
return consumerSpan;
}
private MessageHeaderAccessor mutableHeaderAccessor(Message<?> message) {
MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class);
if (accessor != null && accessor.isMutable()) {
return accessor;
}
MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message);
headers.setLeaveMutable(true);
return headers;
}
private void clearTracingHeaders(MessageHeaderAccessor headers) {
MessageHeaderPropagatorSetter.removeHeaders(headers, this.propagator.fields());
}
/**
* Wraps the given output message with tracing headers and returns a corresponding
* span.
* @param message - message to wrap
* @return a tuple with the wrapped message and a corresponding span
*/
private MessageAndSpan wrapOutputMessage(Message<?> message, Span parentSpan, FunctionContext context) {
Message<?> retrievedMessage = getMessage(message);
MessageHeaderAccessor headers = mutableHeaderAccessor(retrievedMessage);
Span.Builder sendSpanBuilder = tracer.spanBuilder().setParent(parentSpan.context());
clearTracingHeaders(headers);
Span sendSpan = createProducerSpan(context, headers, sendSpanBuilder);
this.propagator.inject(sendSpan.context(), headers, this.setter);
if (log.isDebugEnabled()) {
log.debug("Created a new span output message " + sendSpanBuilder);
}
return new MessageAndSpan(outputMessage(message, retrievedMessage, headers), sendSpan);
}
private Message<?> getMessage(Message<?> message) {
Object payload = message.getPayload();
if (payload instanceof MessagingException e) {
Message<?> failedMessage = e.getFailedMessage();
return failedMessage != null ? failedMessage : message;
}
return message;
}
private Span createProducerSpan(FunctionContext context, MessageHeaderAccessor headers, Span.Builder spanBuilder) {
// TODO: Add documented span for this
spanBuilder.kind(Span.Kind.PRODUCER).name("send").remoteServiceName(toRemoteServiceName(headers));
Span span = spanBuilder.start();
if (!span.isNoop()) {
tagSpan(context, span);
}
return span;
}
private String toRemoteServiceName(MessageHeaderAccessor headers) {
// for (String key : headers.getMessageHeaders().keySet()) {
// if (key.startsWith("kafka_")) {
// return "kafka";
// }
// else if (key.startsWith("amqp_")) {
// return "rabbitmq";
// }
// }
String serviceName = (String) headers.getHeader(MessageUtils.TARGET_PROTOCOL);
if (!StringUtils.hasLength(serviceName)) {
serviceName = REMOTE_SERVICE_NAME;
}
return serviceName;
}
private Message<?> outputMessage(Message<?> originalMessage, Message<?> retrievedMessage,
MessageHeaderAccessor additionalHeaders) {
MessageHeaderAccessor headers = mutableHeaderAccessor(originalMessage);
if (originalMessage instanceof ErrorMessage errorMessage) {
headers.copyHeaders(MessageHeaderPropagatorSetter.copyHeaders(additionalHeaders.getMessageHeaders(),
this.propagator.fields()));
return new ErrorMessage(errorMessage.getPayload(), isWebSockets(headers) ? headers.getMessageHeaders()
: new MessageHeaders(headers.getMessageHeaders()), errorMessage.getOriginalMessage());
}
headers.copyHeaders(additionalHeaders.getMessageHeaders());
return new GenericMessage<>(retrievedMessage.getPayload(),
isWebSockets(headers) ? headers.getMessageHeaders() : new MessageHeaders(headers.getMessageHeaders()));
}
private boolean isWebSockets(MessageHeaderAccessor headerAccessor) {
return headerAccessor.getMessageHeaders().containsKey("stompCommand")
|| headerAccessor.getMessageHeaders().containsKey("simpMessageType");
}
@Override
public Tracer getTracer() {
return this.tracer;
}
// @Override
public boolean supportsContext(Observation.Context context) {
return context instanceof FunctionContext && (((FunctionContext) context).getInput() instanceof Message<?>);
}
private static class MessageAndSpan {
final Message msg;
final Span span;
MessageAndSpan(Message msg, Span span) {
this.msg = msg;
this.span = span;
}
@Override
public String toString() {
return "MessageAndSpan{" + "msg=" + this.msg + ", span=" + this.span + '}';
}
}
private static class MessageAndSpans {
final Message msg;
final Span parentSpan;
final Span childSpan;
MessageAndSpans(Message msg, Span parentSpan, Span childSpan) {
this.msg = msg;
this.parentSpan = parentSpan;
this.childSpan = childSpan;
}
@Override
public String toString() {
return "MessageAndSpans{" + "msg=" + msg + ", parentSpan=" + parentSpan + ", childSpan=" + childSpan + '}';
}
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability.tracing;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import io.micrometer.tracing.propagation.Propagator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.util.StringUtils;
/**
* Getter for Spring Integration based communication.
*
* This always sets native headers in defence of STOMP issues discussed <a href=
* "https://github.com/spring-cloud/spring-cloud-sleuth/issues/716#issuecomment-337523705">here</a>.
*
* @author Marcin Grzejszczak
* @since 4.0.0
*/
public class MessageHeaderPropagatorGetter implements Propagator.Getter<MessageHeaderAccessor> {
private static final Log log = LogFactory.getLog(MessageHeaderPropagatorGetter.class);
@Override
public String get(MessageHeaderAccessor accessor, String key) {
try {
String value = doGet(accessor, key);
if (StringUtils.hasText(value)) {
return value;
}
}
catch (Exception ex) {
if (log.isDebugEnabled()) {
log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex);
}
}
return null;
}
private String doGet(MessageHeaderAccessor accessor, String key) {
if (accessor instanceof NativeMessageHeaderAccessor) {
NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor;
Map<String, List<String>> nativeHeadersMap = nativeAccessor.toNativeHeaderMap();
if (!nativeHeadersMap.isEmpty()) {
return getFromNativeHeaders(nativeHeadersMap, key);
}
}
else {
Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (nativeHeaders instanceof Map) {
Map nativeHeadersMap = (Map) nativeHeaders;
if (!nativeHeadersMap.isEmpty()) {
return getFromNativeHeaders(nativeHeadersMap, key);
}
}
}
Set<Map.Entry<String, Object>> headerEntries = accessor.getMessageHeaders().entrySet();
return getFromHeaders(headerEntries, key);
}
private String getFromHeaders(Set<Map.Entry<String, Object>> headerEntries, String key) {
for (Map.Entry<String, Object> entry : headerEntries) {
if (entry.getKey().equalsIgnoreCase(key)) {
Object result = entry.getValue();
if (result != null) {
if (result instanceof byte[]) {
return new String((byte[]) result, StandardCharsets.UTF_8);
}
return result.toString();
}
}
}
return null;
}
private String getFromNativeHeaders(Map nativeHeaders, String key) {
Set<Map.Entry> entrySet = nativeHeaders.entrySet();
for (Map.Entry entries : entrySet) {
if (entries.getKey() instanceof String) {
String headersKey = (String) entries.getKey();
if (headersKey.equalsIgnoreCase(key)) {
Object result = entries.getValue();
if (result instanceof List && !((List) result).isEmpty()) {
return String.valueOf(((List) result).get(0));
}
}
}
}
return null;
}
@Override
public String toString() {
return "MessageHeaderPropagatorGetter{}";
}
}

View File

@@ -1,134 +0,0 @@
/*
* Copyright 2013-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability.tracing;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.micrometer.tracing.propagation.Propagator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.util.LinkedMultiValueMap;
/**
* Setter for Spring Integration based communication.
*
* This always sets native headers in defense of STOMP issues discussed <a href=
* "https://github.com/spring-cloud/spring-cloud-sleuth/issues/716#issuecomment-337523705">here</a>.
*
* @author Marcin Grzejszczak
* @author Oleg Zhurakousky
* @since 4.0.0
*/
public class MessageHeaderPropagatorSetter implements Propagator.Setter<MessageHeaderAccessor> {
private static final Log log = LogFactory.getLog(MessageHeaderPropagatorSetter.class);
static Map<String, ?> copyHeaders(Map<String, ?> headers, List<String> headersToCopy) {
Map<String, Object> copiedHeaders = new HashMap<>();
for (Map.Entry<String, ?> entry : headers.entrySet()) {
if (headersToCopy.contains(entry.getKey())) {
copiedHeaders.put(entry.getKey(), entry.getValue());
}
}
return copiedHeaders;
}
static void removeHeaders(MessageHeaderAccessor accessor, List<String> keysToRemove) {
for (String keyToRemove : keysToRemove) {
accessor.removeHeader(keyToRemove);
if (accessor instanceof NativeMessageHeaderAccessor) {
NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor;
if (accessor.isMutable()) {
// 1184 native headers can be an immutable map
ensureNativeHeadersAreMutable(nativeAccessor).removeNativeHeader(keyToRemove);
}
}
else {
Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (nativeHeaders instanceof Map) {
((Map) nativeHeaders).remove(keyToRemove);
}
}
}
}
/**
* Since for some reason, the native headers sometimes are immutable even though the
* accessor says that the headers are mutable, then we have to ensure their
* mutability. We do so by first making a mutable copy of the native headers, then by
* removing the native headers from the headers map and replacing them with a mutable
* copy. Workaround for #1184
* @param nativeAccessor accessor containing (or not) native headers
* @return modified accessor
*/
private static NativeMessageHeaderAccessor ensureNativeHeadersAreMutable(
NativeMessageHeaderAccessor nativeAccessor) {
Map<String, List<String>> nativeHeaderMap = nativeAccessor.toNativeHeaderMap();
nativeHeaderMap = nativeHeaderMap instanceof LinkedMultiValueMap ? nativeHeaderMap
: new LinkedMultiValueMap<>(nativeHeaderMap);
nativeAccessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaderMap);
return nativeAccessor;
}
@Override
public void set(MessageHeaderAccessor accessor, String key, String value) {
try {
doPut(accessor, key, value);
}
catch (Exception ex) {
if (log.isDebugEnabled()) {
log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex);
}
}
}
private void doPut(MessageHeaderAccessor accessor, String key, String value) {
accessor.setHeader(key, value);
if (accessor instanceof NativeMessageHeaderAccessor) {
NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor;
ensureNativeHeadersAreMutable(nativeAccessor).setNativeHeader(key, value);
}
else {
Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS);
if (nativeHeaders == null) {
nativeHeaders = new LinkedMultiValueMap<>();
accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders);
}
if (nativeHeaders instanceof Map<?, ?>) {
Map<String, List<String>> copy = toNativeHeaderMap((Map<String, List<String>>) nativeHeaders);
copy.put(key, Collections.singletonList(value));
accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, copy);
}
}
}
private Map<String, List<String>> toNativeHeaderMap(Map<String, List<String>> map) {
return (map != null ? new LinkedMultiValueMap<>(map) : Collections.emptyMap());
}
@Override
public String toString() {
return "MessageHeaderPropagatorSetter{}";
}
}

View File

@@ -1,136 +0,0 @@
/*
* Copyright 2019-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.catalog.observability;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.observation.TimerObservationHandler;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.tracing.Span;
import io.micrometer.tracing.TraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.exporter.FinishedSpan;
import io.micrometer.tracing.propagation.Propagator;
import io.micrometer.tracing.test.SampleTestRunner;
import io.micrometer.tracing.test.reporter.BuildingBlocks;
import io.micrometer.tracing.test.simple.SpanAssert;
import io.micrometer.tracing.test.simple.SpansAssert;
import org.junit.jupiter.api.Disabled;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.function.context.catalog.observability.tracing.FunctionTracingObservationHandler;
import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorGetter;
import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorSetter;
import org.springframework.cloud.function.context.config.JsonMessageConverter;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
@Disabled
class ObservationFunctionAroundWrapperIntegrationTests extends SampleTestRunner {
CompositeMessageConverter messageConverter = new CompositeMessageConverter(
Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper()))));
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(new DefaultConversionService(), messageConverter,
new JacksonMapper(new ObjectMapper()));
ObservationFunctionAroundWrapperIntegrationTests() {
MeterRegistry meterRegistry = new SimpleMeterRegistry();
ObservationRegistry registry = (ObservationRegistry) ObservationRegistry.create().observationConfig().observationHandler(new TimerObservationHandler(meterRegistry));
//super(SampleRunnerConfig.builder().build(), new SimpleMeterRegistry().withTimerObservationHandler());
}
@Override
public BiConsumer<BuildingBlocks, Deque<ObservationHandler>> customizeObservationHandlers() {
return (buildingBlocks, observationHandlers) -> observationHandlers.addFirst(new FunctionTracingObservationHandler(buildingBlocks.getTracer(), testPropagator(buildingBlocks.getTracer()), new MessageHeaderPropagatorGetter(), new MessageHeaderPropagatorSetter()));
}
private Propagator testPropagator(Tracer tracer) {
return new Propagator() {
@Override
public <C> void inject(TraceContext context, C carrier, Setter<C> setter) {
setter.set(carrier, "superHeader", "test");
}
@Override
public List<String> fields() {
return Collections.singletonList("superHeader");
}
@Override
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
return tracer.spanBuilder();
}
};
}
@Override
public SampleTestRunnerConsumer yourCode() throws Exception {
return (buildingBlocks, meterRegistry) -> {
ObservationFunctionAroundWrapper wrapper = new ObservationFunctionAroundWrapper((ObservationRegistry) meterRegistry);
// TESTS
test_tracing_with_function(wrapper, buildingBlocks);
};
}
private void test_tracing_with_function(ObservationFunctionAroundWrapper wrapper, BuildingBlocks bb) {
FunctionRegistration<GreeterFunction> registration = new FunctionRegistration<>(new GreeterFunction(),
"greeter").type(FunctionTypeUtils.discoverFunctionTypeFromClass(GreeterFunction.class));
catalog.register(registration);
SimpleFunctionRegistry.FunctionInvocationWrapper function = catalog.lookup("greeter");
Message<?> result = (Message<?>) wrapper
.apply(MessageBuilder.withPayload("hello").setHeader("superHeader", "someValue").build(), function);
assertThat(result.getPayload()).isEqualTo("HELLO");
List<FinishedSpan> spans = bb.getFinishedSpans();
SpansAssert.assertThat(spans)
.haveSameTraceId()
.hasSize(3);
SpanAssert.assertThat(spans.get(0)).hasNameEqualTo("handle").isStarted();
SpanAssert.assertThat(spans.get(1)).hasNameEqualTo("greeter").isStarted();
SpanAssert.assertThat(spans.get(2)).hasNameEqualTo("send").isStarted();
}
private static class GreeterFunction implements Function<String, String> {
@Override
public String apply(String in) {
return in.toUpperCase();
}
}
}