diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index fb79aec29..91e8418a1 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -17,6 +17,8 @@ 1.10.2 + 2.0.0-SNAPSHOT + 1.0.0-SNAPSHOT @@ -131,7 +133,44 @@ spring-boot-starter-actuator true + + + + io.micrometer + micrometer-core + true + + + io.micrometer + micrometer-tracing-api + true + + + io.micrometer + micrometer-tracing-integration-test + test + + + + + + io.micrometer + micrometer-bom + ${micrometer-bom.version} + pom + import + + + io.micrometer + micrometer-tracing-bom + ${micrometer-tracing-bom.version} + pom + import + + + + diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java new file mode 100644 index 000000000..77484de33 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java @@ -0,0 +1,26 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import io.micrometer.api.instrument.Tags; + +public class DefaultFunctionTagsProvider implements FunctionTagsProvider { + @Override + public Tags getLowCardinalityTags(FunctionContext context) { + return Tags.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.of(context.getTargetFunction().getFunctionDefinition())); + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java new file mode 100644 index 000000000..4f663c076 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java @@ -0,0 +1,93 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import io.micrometer.api.instrument.observation.Observation; +import io.micrometer.api.lang.Nullable; + +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; + +/** + * Context. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class FunctionContext extends Observation.Context { + + private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction; + + private Object input; + + private Object modifiedInput; + + private Object output; + + private Object modifiedOutput; + + public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + this.targetFunction = targetFunction; + } + + public FunctionContext withInput(Object input) { + this.input = input; + this.modifiedInput = input; + return this; + } + + public FunctionContext withOutput(Object output) { + this.output = output; + this.modifiedOutput = output; + return this; + } + + @Nullable + public Object getInput() { + return input; + } + + public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() { + return targetFunction; + } + + @Nullable + public Object getModifiedInput() { + return modifiedInput; + } + + public void setModifiedInput(Object modifiedInput) { + this.modifiedInput = modifiedInput; + } + + @Nullable + public Object getOutput() { + return output; + } + + public void setOutput(Object output) { + this.output = output; + } + + @Nullable + public Object getModifiedOutput() { + return modifiedOutput; + } + + public void setModifiedOutput(Object modifiedOutput) { + this.modifiedOutput = modifiedOutput; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java new file mode 100644 index 000000000..9213f5b36 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java @@ -0,0 +1,62 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import io.micrometer.api.instrument.docs.DocumentedObservation; +import io.micrometer.api.instrument.docs.TagKey; + +enum FunctionObservation implements DocumentedObservation { + + /** + * Observation created around a function execution + */ + FUNCTION_OBSERVATION { + @Override + public String getName() { + return "spring.function"; + } + + @Override + public String getContextualName() { + return "function"; + } + + @Override + public TagKey[] getLowCardinalityTagKeys() { + return FunctionLowCardinalityTags.values(); + } + + @Override + public String getPrefix() { + return "spring.function"; + } + }; + + enum FunctionLowCardinalityTags implements TagKey { + + /** + * Name of the function. + */ + FUNCTION_NAME { + @Override + public String getKey() { + return "spring.function.name"; + } + } + + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java new file mode 100644 index 000000000..ffe30acb4 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java @@ -0,0 +1,32 @@ +/* + * Copyright 2006-2009 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import io.micrometer.api.instrument.observation.Observation; + +/** + * {@link Observation.TagsProvider} for {@link FunctionContext}. + * + * @author Marcin Grzejszczak + */ +public interface FunctionTagsProvider extends Observation.TagsProvider { + + @Override + default boolean supportsContext(Observation.Context context) { + return context instanceof FunctionContext; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java new file mode 100644 index 000000000..a965c74da --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java @@ -0,0 +1,87 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import io.micrometer.api.instrument.observation.Observation; +import io.micrometer.api.instrument.observation.ObservationRegistry; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; + +import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; + +/** + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper implements Observation.TagsProviderAware { + + private static final Log log = LogFactory.getLog(ObservationFunctionAroundWrapper.class); + + private final ObservationRegistry observationRegistry; + + private FunctionTagsProvider tagsProvider = new DefaultFunctionTagsProvider(); + + public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry) { + this.observationRegistry = observationRegistry; + } + + @Override + protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) { + return targetFunction.apply(message); // no instrumentation + } + else if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) { + return reactorStream((Publisher) message, targetFunction); + } + return nonReactorStream(message, targetFunction); + } + + private Object reactorStream(Publisher message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + // TODO + return message; + } + + private Object nonReactorStream(Object message, + SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + FunctionContext context = new FunctionContext(targetFunction).withInput(message); + Object invocationMessage = context.getModifiedInput(); + Object result = Observation + .createNotStarted(FunctionObservation.FUNCTION_OBSERVATION.getName(), context, this.observationRegistry) + .contextualName(FunctionObservation.FUNCTION_OBSERVATION.getContextualName()) + .tagsProvider(this.tagsProvider) + .observe(() -> { + Object r = message == null ? targetFunction.get() : targetFunction.apply(invocationMessage); + context.setOutput(r); + return r; + }); + if (result == null) { + if (log.isDebugEnabled()) { + log.debug("Returned message is null - we have a consumer"); + } + return null; + } + return context.getModifiedOutput(); + } + + @Override + public void setTagsProvider(FunctionTagsProvider functionTagsProvider) { + this.tagsProvider = functionTagsProvider; + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java new file mode 100644 index 000000000..e3f6800fd --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java @@ -0,0 +1,331 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability.tracing; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import io.micrometer.api.instrument.observation.Observation; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.handler.TracingObservationHandler; +import io.micrometer.tracing.propagation.Propagator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; +import org.springframework.cloud.function.context.catalog.observability.FunctionContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.MessagingException; +import org.springframework.messaging.support.ErrorMessage; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.MessageHeaderAccessor; + +/** + * Function Tracing Observation Handler. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class FunctionTracingObservationHandler implements TracingObservationHandler { + + private static final Log log = LogFactory.getLog(FunctionTracingObservationHandler.class); + + /** + * Using the literal "broker" until we come up with a better solution. + * + *

+ * If the message originated from a binder (consumer binding), there will be different + * headers present (e.g. "KafkaHeaders.RECEIVED_TOPIC" Vs. + * "AmqpHeaders.CONSUMER_QUEUE" (unless the application removes them before sending). + * These don't represent the broker, rather a queue, and in any case the heuristics + * are not great. At least we might be able to tell if this is rabbit or not (ex how + * spring-rabbit works). We need to think this through before making an api, possibly + * experimenting. + * + *

+ * If the app is outbound only (producer), there's no indication of what type the + * destination broker is. This may hint at a non-manual solution being overwriting the + * remoteServiceName later, similar to how servlet instrumentation lazy set + * "http.route". + */ + private static final String REMOTE_SERVICE_NAME = "broker"; + + private final Tracer tracer; + + private final Propagator propagator; + + private final Propagator.Getter getter; + + private final Propagator.Setter setter; + + public FunctionTracingObservationHandler(Tracer tracer, Propagator propagator, MessageHeaderPropagatorGetter getter, MessageHeaderPropagatorSetter setter) { + this.tracer = tracer; + this.propagator = propagator; + this.getter = getter; + this.setter = setter; + } + + @Override + public void onStart(FunctionContext context) { + Message message = (Message) context.getInput(); + MessageAndSpans wrappedInputMessage = null; + SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction = context.getTargetFunction(); + Span functionSpan = null; + if (message == null && targetFunction.isSupplier()) { // Supplier + if (log.isDebugEnabled()) { + log.debug("Creating a span for a supplier"); + } + functionSpan = this.tracer.nextSpan().start(); + } + else { + if (log.isDebugEnabled()) { + log.debug("Will retrieve the tracing headers from the message"); + } + // This will create a handle span + wrappedInputMessage = wrapInputMessage(context, message); + if (log.isDebugEnabled()) { + log.debug("Wrapped input msg " + wrappedInputMessage); + } + functionSpan = wrappedInputMessage.childSpan; + } + context.put(MessageAndSpans.class, wrappedInputMessage); + // This is the function span + getTracingContext(context).setSpan(functionSpan); + } + + @Override + public void onStop(FunctionContext context) { + MessageAndSpans invocationMessage = context.get(MessageAndSpans.class); + Span functionSpan = getRequiredSpan(context); + functionSpan.name(context.getTargetFunction().getFunctionDefinition()).end(); + Object result = context.getOutput(); + Message msgResult = toMessage(result); + MessageAndSpan wrappedOutputMessage; + if (log.isDebugEnabled()) { + log.debug("Will instrument the output message"); + } + if (invocationMessage != null) { + wrappedOutputMessage = wrapOutputMessage(msgResult, invocationMessage.parentSpan, context); + } + else { + wrappedOutputMessage = wrapOutputMessage(msgResult, functionSpan, context); + } + if (log.isDebugEnabled()) { + log.debug("Wrapped output msg " + wrappedOutputMessage); + } + wrappedOutputMessage.span.end(); + context.setModifiedOutput(wrappedOutputMessage.msg); + } + +// String inputDestination(String functionDefinition) { +// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { +// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0"; +// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) +// ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0"; +// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); +// }); +// } +// +// String outputDestination(String functionDefinition) { +// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { +// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0"; +// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) +// ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0"; +// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); +// }); +// } + + private Message toMessage(Object result) { + if (!(result instanceof Message)) { + return MessageBuilder.withPayload(result).build(); + } + return (Message) result; + } + + /** + * Wraps the given input message with tracing headers and returns a corresponding + * span. + * @param message - message to wrap + * @return a tuple with the wrapped message and a corresponding span + */ + private MessageAndSpans wrapInputMessage(FunctionContext context, Message message) { + MessageHeaderAccessor headers = mutableHeaderAccessor(message); + Span.Builder consumerSpanBuilder = this.propagator.extract(headers, this.getter); + Span handleSpan = consumerSpan(context, consumerSpanBuilder); + if (log.isDebugEnabled()) { + log.debug("Built a consumer span " + handleSpan); + } + Span functionSpan = tracer.nextSpan(handleSpan).name(context.getContextualName()).start(); + clearTracingHeaders(headers); + if (message instanceof ErrorMessage) { + return new MessageAndSpans(new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()), + handleSpan, functionSpan); + } + headers.setImmutable(); + return new MessageAndSpans(new GenericMessage<>(message.getPayload(), headers.getMessageHeaders()), + handleSpan, functionSpan); + } + + // Handle span + private Span consumerSpan(FunctionContext context, Span.Builder consumerSpanBuilder) { + // TODO: Add this as a documented span + consumerSpanBuilder.kind(Span.Kind.CONSUMER).name("handle"); + consumerSpanBuilder.remoteServiceName(REMOTE_SERVICE_NAME); + // this is the consumer part of the producer->consumer mechanism + Span consumerSpan = consumerSpanBuilder.start(); + tagSpan(context, consumerSpan); + // we're ending this immediately just to have a properly nested graph + consumerSpan.end(); + return consumerSpan; + } + + private MessageHeaderAccessor mutableHeaderAccessor(Message message) { + MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); + if (accessor != null && accessor.isMutable()) { + return accessor; + } + MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message); + headers.setLeaveMutable(true); + return headers; + } + + private void clearTracingHeaders(MessageHeaderAccessor headers) { + MessageHeaderPropagatorSetter.removeAnyTraceHeaders(headers, this.propagator.fields()); + } + + /** + * Wraps the given output message with tracing headers and returns a corresponding + * span. + * @param message - message to wrap + * @return a tuple with the wrapped message and a corresponding span + */ + private MessageAndSpan wrapOutputMessage(Message message, Span parentSpan, FunctionContext context) { + Message retrievedMessage = getMessage(message); + MessageHeaderAccessor headers = mutableHeaderAccessor(retrievedMessage); + Span.Builder sendSpanBuilder = tracer.spanBuilder().setParent(parentSpan.context()); + clearTracingHeaders(headers); + Span sendSpan = createProducerSpan(context, headers, sendSpanBuilder); + this.propagator.inject(sendSpan.context(), headers, this.setter); + if (log.isDebugEnabled()) { + log.debug("Created a new span output message " + sendSpanBuilder); + } + return new MessageAndSpan(outputMessage(message, retrievedMessage, headers), sendSpan); + } + + private Message getMessage(Message message) { + Object payload = message.getPayload(); + if (payload instanceof MessagingException e) { + Message failedMessage = e.getFailedMessage(); + return failedMessage != null ? failedMessage : message; + } + return message; + } + + private Span createProducerSpan(FunctionContext context, MessageHeaderAccessor headers, Span.Builder spanBuilder) { + // TODO: Add documented span for this + spanBuilder.kind(Span.Kind.PRODUCER).name("send").remoteServiceName(toRemoteServiceName(headers)); + Span span = spanBuilder.start(); + if (!span.isNoop()) { + tagSpan(context, span); + } + return span; + } + + private String toRemoteServiceName(MessageHeaderAccessor headers) { + for (String key : headers.getMessageHeaders().keySet()) { + if (key.startsWith("kafka_")) { + return "kafka"; + } + else if (key.startsWith("amqp_")) { + return "rabbitmq"; + } + } + return REMOTE_SERVICE_NAME; + } + + private Message outputMessage(Message originalMessage, Message retrievedMessage, + MessageHeaderAccessor additionalHeaders) { + MessageHeaderAccessor headers = mutableHeaderAccessor(originalMessage); + if (originalMessage instanceof ErrorMessage errorMessage) { + headers.copyHeaders(MessageHeaderPropagatorSetter.propagationHeaders(additionalHeaders.getMessageHeaders(), + this.propagator.fields())); + return new ErrorMessage(errorMessage.getPayload(), isWebSockets(headers) ? headers.getMessageHeaders() + : new MessageHeaders(headers.getMessageHeaders()), errorMessage.getOriginalMessage()); + } + headers.copyHeaders(additionalHeaders.getMessageHeaders()); + return new GenericMessage<>(retrievedMessage.getPayload(), + isWebSockets(headers) ? headers.getMessageHeaders() : new MessageHeaders(headers.getMessageHeaders())); + } + + private boolean isWebSockets(MessageHeaderAccessor headerAccessor) { + return headerAccessor.getMessageHeaders().containsKey("stompCommand") + || headerAccessor.getMessageHeaders().containsKey("simpMessageType"); + } + + @Override + public Tracer getTracer() { + return this.tracer; + } + + @Override + public boolean supportsContext(Observation.Context context) { + return context instanceof FunctionContext && (((FunctionContext) context).getInput() instanceof Message); + } + + private static class MessageAndSpan { + + final Message msg; + + final Span span; + + MessageAndSpan(Message msg, Span span) { + this.msg = msg; + this.span = span; + } + + @Override + public String toString() { + return "MessageAndSpan{" + "msg=" + this.msg + ", span=" + this.span + '}'; + } + + } + + private static class MessageAndSpans { + + final Message msg; + + final Span parentSpan; + + final Span childSpan; + + MessageAndSpans(Message msg, Span parentSpan, Span childSpan) { + this.msg = msg; + this.parentSpan = parentSpan; + this.childSpan = childSpan; + } + + @Override + public String toString() { + return "MessageAndSpans{" + "msg=" + msg + ", parentSpan=" + parentSpan + ", childSpan=" + childSpan + '}'; + } + + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java new file mode 100644 index 000000000..ff5140bcf --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java @@ -0,0 +1,118 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability.tracing; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import io.micrometer.tracing.propagation.Propagator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.messaging.support.NativeMessageHeaderAccessor; +import org.springframework.util.StringUtils; + +/** + * Getter for Spring Integration based communication. + * + * This always sets native headers in defence of STOMP issues discussed here. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class MessageHeaderPropagatorGetter implements Propagator.Getter { + + private static final Log log = LogFactory.getLog(MessageHeaderPropagatorGetter.class); + + @Override + public String get(MessageHeaderAccessor accessor, String key) { + try { + String value = doGet(accessor, key); + if (StringUtils.hasText(value)) { + return value; + } + } + catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); + } + } + return null; + } + + private String doGet(MessageHeaderAccessor accessor, String key) { + if (accessor instanceof NativeMessageHeaderAccessor) { + NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; + Map> nativeHeadersMap = nativeAccessor.toNativeHeaderMap(); + if (!nativeHeadersMap.isEmpty()) { + return getFromNativeHeaders(nativeHeadersMap, key); + } + } + else { + Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); + if (nativeHeaders instanceof Map) { + Map nativeHeadersMap = (Map) nativeHeaders; + if (!nativeHeadersMap.isEmpty()) { + return getFromNativeHeaders(nativeHeadersMap, key); + } + } + } + Set> headerEntries = accessor.getMessageHeaders().entrySet(); + return getFromHeaders(headerEntries, key); + } + + private String getFromHeaders(Set> headerEntries, String key) { + for (Map.Entry entry : headerEntries) { + if (entry.getKey().equalsIgnoreCase(key)) { + Object result = entry.getValue(); + if (result != null) { + if (result instanceof byte[]) { + return new String((byte[]) result, StandardCharsets.UTF_8); + } + return result.toString(); + } + } + } + return null; + } + + private String getFromNativeHeaders(Map nativeHeaders, String key) { + Set entrySet = nativeHeaders.entrySet(); + for (Map.Entry entries : entrySet) { + if (entries.getKey() instanceof String) { + String headersKey = (String) entries.getKey(); + if (headersKey.equalsIgnoreCase(key)) { + Object result = entries.getValue(); + if (result instanceof List && !((List) result).isEmpty()) { + return String.valueOf(((List) result).get(0)); + } + } + } + } + return null; + } + + @Override + public String toString() { + return "MessageHeaderPropagatorGetter{}"; + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java new file mode 100644 index 000000000..7fc434855 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java @@ -0,0 +1,134 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability.tracing; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.micrometer.tracing.propagation.Propagator; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.messaging.support.MessageHeaderAccessor; +import org.springframework.messaging.support.NativeMessageHeaderAccessor; +import org.springframework.util.LinkedMultiValueMap; + +/** + * Setter for Spring Integration based communication. + * + * This always sets native headers in defence of STOMP issues discussed here. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class MessageHeaderPropagatorSetter implements Propagator.Setter { + + private static final Log log = LogFactory.getLog(MessageHeaderPropagatorSetter.class); + + static Map propagationHeaders(Map headers, List propagationHeaders) { + Map headersToCopy = new HashMap<>(); + for (Map.Entry entry : headers.entrySet()) { + if (propagationHeaders.contains(entry.getKey())) { + headersToCopy.put(entry.getKey(), entry.getValue()); + } + } + return headersToCopy; + } + + static void removeAnyTraceHeaders(MessageHeaderAccessor accessor, List keysToRemove) { + for (String keyToRemove : keysToRemove) { + accessor.removeHeader(keyToRemove); + if (accessor instanceof NativeMessageHeaderAccessor) { + NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; + if (accessor.isMutable()) { + // 1184 native headers can be an immutable map + ensureNativeHeadersAreMutable(nativeAccessor).removeNativeHeader(keyToRemove); + } + } + else { + Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); + if (nativeHeaders instanceof Map) { + ((Map) nativeHeaders).remove(keyToRemove); + } + } + } + } + + /** + * Since for some reason, the native headers sometimes are immutable even though the + * accessor says that the headers are mutable, then we have to ensure their + * mutability. We do so by first making a mutable copy of the native headers, then by + * removing the native headers from the headers map and replacing them with a mutable + * copy. Workaround for #1184 + * @param nativeAccessor accessor containing (or not) native headers + * @return modified accessor + */ + private static NativeMessageHeaderAccessor ensureNativeHeadersAreMutable( + NativeMessageHeaderAccessor nativeAccessor) { + Map> nativeHeaderMap = nativeAccessor.toNativeHeaderMap(); + nativeHeaderMap = nativeHeaderMap instanceof LinkedMultiValueMap ? nativeHeaderMap + : new LinkedMultiValueMap<>(nativeHeaderMap); + nativeAccessor.removeHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); + nativeAccessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaderMap); + return nativeAccessor; + } + + @Override + public void set(MessageHeaderAccessor accessor, String key, String value) { + try { + doPut(accessor, key, value); + } + catch (Exception ex) { + if (log.isDebugEnabled()) { + log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); + } + } + } + + private void doPut(MessageHeaderAccessor accessor, String key, String value) { + accessor.setHeader(key, value); + if (accessor instanceof NativeMessageHeaderAccessor) { + NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; + ensureNativeHeadersAreMutable(nativeAccessor).setNativeHeader(key, value); + } + else { + Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); + if (nativeHeaders == null) { + nativeHeaders = new LinkedMultiValueMap<>(); + accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); + } + if (nativeHeaders instanceof Map) { + Map> copy = toNativeHeaderMap((Map>) nativeHeaders); + copy.put(key, Collections.singletonList(value)); + accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, copy); + } + } + } + + private Map> toNativeHeaderMap(Map> map) { + return (map != null ? new LinkedMultiValueMap<>(map) : Collections.emptyMap()); + } + + @Override + public String toString() { + return "MessageHeaderPropagatorSetter{}"; + } + +} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java new file mode 100644 index 000000000..dbd37118a --- /dev/null +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java @@ -0,0 +1,129 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context.catalog.observability; + +import java.util.Collections; +import java.util.Deque; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.api.instrument.observation.ObservationHandler; +import io.micrometer.api.instrument.simple.SimpleMeterRegistry; +import io.micrometer.tracing.Span; +import io.micrometer.tracing.TraceContext; +import io.micrometer.tracing.Tracer; +import io.micrometer.tracing.exporter.FinishedSpan; +import io.micrometer.tracing.propagation.Propagator; +import io.micrometer.tracing.test.SampleTestRunner; +import io.micrometer.tracing.test.reporter.BuildingBlocks; +import io.micrometer.tracing.test.simple.SpanAssert; +import io.micrometer.tracing.test.simple.SpansAssert; + +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; +import org.springframework.cloud.function.context.catalog.observability.tracing.FunctionTracingObservationHandler; +import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorGetter; +import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorSetter; +import org.springframework.cloud.function.context.config.JsonMessageConverter; +import org.springframework.cloud.function.json.JacksonMapper; +import org.springframework.core.convert.support.DefaultConversionService; +import org.springframework.messaging.Message; +import org.springframework.messaging.converter.CompositeMessageConverter; +import org.springframework.messaging.support.MessageBuilder; + +import static org.assertj.core.api.Assertions.assertThat; + +class ObservationFunctionAroundWrapperIntegrationTests extends SampleTestRunner { + + CompositeMessageConverter messageConverter = new CompositeMessageConverter( + Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper())))); + + SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(new DefaultConversionService(), messageConverter, + new JacksonMapper(new ObjectMapper())); + + ObservationFunctionAroundWrapperIntegrationTests() { + super(SampleRunnerConfig.builder().build(), new SimpleMeterRegistry().withTimerObservationHandler()); + } + + @Override + public BiConsumer> customizeObservationHandlers() { + return (buildingBlocks, observationHandlers) -> observationHandlers.addFirst(new FunctionTracingObservationHandler(buildingBlocks.getTracer(), testPropagator(buildingBlocks.getTracer()), new MessageHeaderPropagatorGetter(), new MessageHeaderPropagatorSetter())); + } + + private Propagator testPropagator(Tracer tracer) { + return new Propagator() { + + @Override + public void inject(TraceContext context, C carrier, Setter setter) { + setter.set(carrier, "superHeader", "test"); + } + + @Override + public List fields() { + return Collections.singletonList("superHeader"); + } + + @Override + public Span.Builder extract(C carrier, Getter getter) { + return tracer.spanBuilder(); + } + }; + } + + @Override + public SampleTestRunnerConsumer yourCode() throws Exception { + return (buildingBlocks, meterRegistry) -> { + ObservationFunctionAroundWrapper wrapper = new ObservationFunctionAroundWrapper(meterRegistry); + + // TESTS + + test_tracing_with_function(wrapper, buildingBlocks); + }; + } + + private void test_tracing_with_function(ObservationFunctionAroundWrapper wrapper, BuildingBlocks bb) { + FunctionRegistration registration = new FunctionRegistration<>(new GreeterFunction(), + "greeter").type(FunctionTypeUtils.discoverFunctionTypeFromClass(GreeterFunction.class)); + catalog.register(registration); + SimpleFunctionRegistry.FunctionInvocationWrapper function = catalog.lookup("greeter"); + + Message result = (Message) wrapper + .apply(MessageBuilder.withPayload("hello").setHeader("superHeader", "someValue").build(), function); + + assertThat(result.getPayload()).isEqualTo("HELLO"); + List spans = bb.getFinishedSpans(); + SpansAssert.assertThat(spans) + .haveSameTraceId() + .hasSize(3); + SpanAssert.assertThat(spans.get(0)).hasNameEqualTo("handle").isStarted(); + SpanAssert.assertThat(spans.get(1)).hasNameEqualTo("greeter").isStarted(); + SpanAssert.assertThat(spans.get(2)).hasNameEqualTo("send").isStarted(); + } + + + private static class GreeterFunction implements Function { + + @Override + public String apply(String in) { + return in.toUpperCase(); + } + + } +}