From 830a7e7cc2c8a90650da5dd6b9191cfbee7bb6ed Mon Sep 17 00:00:00 2001 From: spencergibb Date: Thu, 31 Mar 2022 13:05:24 -0400 Subject: [PATCH] Remove io.micrometer.observation code --- spring-cloud-function-context/pom.xml | 9 +- .../DefaultFunctionTagsProvider.java | 26 -- .../observability/FunctionContext.java | 94 ----- .../observability/FunctionObservation.java | 66 ---- .../observability/FunctionTagsProvider.java | 32 -- .../ObservationFunctionAroundWrapper.java | 89 ----- .../FunctionTracingObservationHandler.java | 334 ------------------ .../MessageHeaderPropagatorGetter.java | 118 ------- .../MessageHeaderPropagatorSetter.java | 134 ------- ...FunctionAroundWrapperIntegrationTests.java | 136 ------- 10 files changed, 2 insertions(+), 1036 deletions(-) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java delete mode 100644 spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml index ac872b370..1addd268f 100644 --- a/spring-cloud-function-context/pom.xml +++ b/spring-cloud-function-context/pom.xml @@ -17,8 +17,8 @@ 1.10.2 - 2.0.0-SNAPSHOT - 1.0.0-SNAPSHOT + 2.0.0-M3 + 1.0.0-M3 @@ -135,11 +135,6 @@ - - io.micrometer - micrometer-observation - - io.micrometer micrometer-core diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java deleted file mode 100644 index 49762b8b6..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/DefaultFunctionTagsProvider.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - -import io.micrometer.common.Tags; - -public class DefaultFunctionTagsProvider implements FunctionTagsProvider { - @Override - public Tags getLowCardinalityTags(FunctionContext context) { - return Tags.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.of(context.getTargetFunction().getFunctionDefinition())); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java deleted file mode 100644 index d6fa90fd9..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionContext.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - - -import io.micrometer.core.lang.Nullable; -import io.micrometer.observation.Observation; - -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; - -/** - * Context. - * - * @author Marcin Grzejszczak - * @since 4.0.0 - */ -public class FunctionContext extends Observation.Context { - - private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction; - - private Object input; - - private Object modifiedInput; - - private Object output; - - private Object modifiedOutput; - - public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - this.targetFunction = targetFunction; - } - - public FunctionContext withInput(Object input) { - this.input = input; - this.modifiedInput = input; - return this; - } - - public FunctionContext withOutput(Object output) { - this.output = output; - this.modifiedOutput = output; - return this; - } - - @Nullable - public Object getInput() { - return input; - } - - public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() { - return targetFunction; - } - - @Nullable - public Object getModifiedInput() { - return modifiedInput; - } - - public void setModifiedInput(Object modifiedInput) { - this.modifiedInput = modifiedInput; - } - - @Nullable - public Object getOutput() { - return output; - } - - public void setOutput(Object output) { - this.output = output; - } - - @Nullable - public Object getModifiedOutput() { - return modifiedOutput; - } - - public void setModifiedOutput(Object modifiedOutput) { - this.modifiedOutput = modifiedOutput; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java deleted file mode 100644 index 7d4d568e4..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionObservation.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - -import io.micrometer.common.docs.TagKey; -import io.micrometer.observation.docs.DocumentedObservation; - -/** - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -enum FunctionObservation implements DocumentedObservation { - /** - * Observation created around a function execution. - */ - FUNCTION_OBSERVATION { - @Override - public String getName() { - return "spring.cloud.function"; - } - - @Override - public String getContextualName() { - return "function"; - } - - @Override - public TagKey[] getLowCardinalityTagKeys() { - return FunctionLowCardinalityTags.values(); - } - - @Override - public String getPrefix() { - return "spring.cloud.function"; - } - }; - - enum FunctionLowCardinalityTags implements TagKey { - - /** - * Name of the function. - */ - FUNCTION_NAME { - @Override - public String getKey() { - return "spring.cloud.function.name"; - } - } - - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java deleted file mode 100644 index 90712e63c..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/FunctionTagsProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2006-2009 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - -import io.micrometer.observation.Observation; - -/** - * {@link Observation.TagsProvider} for {@link FunctionContext}. - * - * @author Marcin Grzejszczak - */ -public interface FunctionTagsProvider extends Observation.TagsProvider { - - @Override - default boolean supportsContext(Observation.Context context) { - return context instanceof FunctionContext; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java deleted file mode 100644 index 4b031302b..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapper.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - -import io.micrometer.observation.Observation; -import io.micrometer.observation.ObservationRegistry; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; - -import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; -import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; - - - -/** - * @author Marcin Grzejszczak - * @since 4.0.0 - */ -public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper implements Observation.TagsProviderAware { - - private static final Log log = LogFactory.getLog(ObservationFunctionAroundWrapper.class); - - private final ObservationRegistry observationRegistry; - - private FunctionTagsProvider tagsProvider = new DefaultFunctionTagsProvider(); - - public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry) { - this.observationRegistry = observationRegistry; - } - - @Override - protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) { - return targetFunction.apply(message); // no instrumentation - } - else if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) { - return reactorStream((Publisher) message, targetFunction); - } - return nonReactorStream(message, targetFunction); - } - - private Object reactorStream(Publisher message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - // TODO - return message; - } - - private Object nonReactorStream(Object message, - SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - FunctionContext context = new FunctionContext(targetFunction).withInput(message); - Object invocationMessage = context.getModifiedInput(); - Object result = Observation - .createNotStarted(FunctionObservation.FUNCTION_OBSERVATION.getName(), context, this.observationRegistry) - .contextualName(FunctionObservation.FUNCTION_OBSERVATION.getContextualName()) - .tagsProvider(this.tagsProvider) - .observe(() -> { - Object r = message == null ? targetFunction.get() : targetFunction.apply(invocationMessage); - context.setOutput(r); - return r; - }); - if (result == null) { - if (log.isDebugEnabled()) { - log.debug("Returned message is null - we have a consumer"); - } - return null; - } - return context.getModifiedOutput(); - } - - @Override - public void setTagsProvider(FunctionTagsProvider functionTagsProvider) { - this.tagsProvider = functionTagsProvider; - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java deleted file mode 100644 index cda860c0c..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/FunctionTracingObservationHandler.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability.tracing; - -import io.micrometer.observation.Observation; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.handler.TracingObservationHandler; -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.cloud.function.context.catalog.observability.FunctionContext; -import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.util.StringUtils; - -/** - * Function Tracing Observation Handler. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class FunctionTracingObservationHandler implements TracingObservationHandler { - - private static final Log log = LogFactory.getLog(FunctionTracingObservationHandler.class); - - /** - * Using the literal "broker" until we come up with a better solution. - * - *

- * If the message originated from a binder (consumer binding), there will be different - * headers present (e.g. "KafkaHeaders.RECEIVED_TOPIC" Vs. - * "AmqpHeaders.CONSUMER_QUEUE" (unless the application removes them before sending). - * These don't represent the broker, rather a queue, and in any case the heuristics - * are not great. At least we might be able to tell if this is rabbit or not (ex how - * spring-rabbit works). We need to think this through before making an api, possibly - * experimenting. - * - *

- * If the app is outbound only (producer), there's no indication of what type the - * destination broker is. This may hint at a non-manual solution being overwriting the - * remoteServiceName later, similar to how servlet instrumentation lazy set - * "http.route". - */ - private static final String REMOTE_SERVICE_NAME = "broker"; - - private final Tracer tracer; - - private final Propagator propagator; - - private final Propagator.Getter getter; - - private final Propagator.Setter setter; - - public FunctionTracingObservationHandler(Tracer tracer, Propagator propagator, MessageHeaderPropagatorGetter getter, MessageHeaderPropagatorSetter setter) { - this.tracer = tracer; - this.propagator = propagator; - this.getter = getter; - this.setter = setter; - } - - @Override - public void onStart(FunctionContext context) { - Message message = (Message) context.getInput(); - MessageAndSpans wrappedInputMessage = null; - SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction = context.getTargetFunction(); - Span functionSpan = null; - if (message == null && targetFunction.isSupplier()) { // Supplier - if (log.isDebugEnabled()) { - log.debug("Creating a span for a supplier"); - } - functionSpan = this.tracer.nextSpan().start(); - } - else { - if (log.isDebugEnabled()) { - log.debug("Will retrieve the tracing headers from the message"); - } - // This will create a handle span - wrappedInputMessage = wrapInputMessage(context, message); - if (log.isDebugEnabled()) { - log.debug("Wrapped input msg " + wrappedInputMessage); - } - functionSpan = wrappedInputMessage.childSpan; - } - context.put(MessageAndSpans.class, wrappedInputMessage); - // This is the function span - getTracingContext(context).setSpan(functionSpan); - } - - @Override - public void onStop(FunctionContext context) { - MessageAndSpans invocationMessage = context.get(MessageAndSpans.class); - Span functionSpan = getRequiredSpan(context); - functionSpan.name(context.getTargetFunction().getFunctionDefinition()).end(); - Object result = context.getOutput(); - Message msgResult = toMessage(result); - MessageAndSpan wrappedOutputMessage; - if (log.isDebugEnabled()) { - log.debug("Will instrument the output message"); - } - if (invocationMessage != null) { - wrappedOutputMessage = wrapOutputMessage(msgResult, invocationMessage.parentSpan, context); - } - else { - wrappedOutputMessage = wrapOutputMessage(msgResult, functionSpan, context); - } - if (log.isDebugEnabled()) { - log.debug("Wrapped output msg " + wrappedOutputMessage); - } - wrappedOutputMessage.span.end(); - context.setModifiedOutput(wrappedOutputMessage.msg); - } - -// String inputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } -// -// String outputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } - - private Message toMessage(Object result) { - if (!(result instanceof Message)) { - return MessageBuilder.withPayload(result).build(); - } - return (Message) result; - } - - /** - * Wraps the given input message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpans wrapInputMessage(FunctionContext context, Message message) { - MessageHeaderAccessor headers = mutableHeaderAccessor(message); - Span.Builder consumerSpanBuilder = this.propagator.extract(headers, this.getter); - Span handleSpan = consumerSpan(context, consumerSpanBuilder); - if (log.isDebugEnabled()) { - log.debug("Built a consumer span " + handleSpan); - } - Span functionSpan = tracer.nextSpan(handleSpan).name(context.getContextualName()).start(); - clearTracingHeaders(headers); - if (message instanceof ErrorMessage) { - return new MessageAndSpans(new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - headers.setImmutable(); - return new MessageAndSpans(new GenericMessage<>(message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - - // Handle span - private Span consumerSpan(FunctionContext context, Span.Builder consumerSpanBuilder) { - // TODO: Add this as a documented span - consumerSpanBuilder.kind(Span.Kind.CONSUMER).name("handle"); - consumerSpanBuilder.remoteServiceName(REMOTE_SERVICE_NAME); - // this is the consumer part of the producer->consumer mechanism - Span consumerSpan = consumerSpanBuilder.start(); - tagSpan(context, consumerSpan); - // we're ending this immediately just to have a properly nested graph - consumerSpan.end(); - return consumerSpan; - } - - private MessageHeaderAccessor mutableHeaderAccessor(Message message) { - MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); - if (accessor != null && accessor.isMutable()) { - return accessor; - } - MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message); - headers.setLeaveMutable(true); - return headers; - } - - private void clearTracingHeaders(MessageHeaderAccessor headers) { - MessageHeaderPropagatorSetter.removeHeaders(headers, this.propagator.fields()); - } - - /** - * Wraps the given output message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpan wrapOutputMessage(Message message, Span parentSpan, FunctionContext context) { - Message retrievedMessage = getMessage(message); - MessageHeaderAccessor headers = mutableHeaderAccessor(retrievedMessage); - Span.Builder sendSpanBuilder = tracer.spanBuilder().setParent(parentSpan.context()); - clearTracingHeaders(headers); - Span sendSpan = createProducerSpan(context, headers, sendSpanBuilder); - this.propagator.inject(sendSpan.context(), headers, this.setter); - if (log.isDebugEnabled()) { - log.debug("Created a new span output message " + sendSpanBuilder); - } - return new MessageAndSpan(outputMessage(message, retrievedMessage, headers), sendSpan); - } - - private Message getMessage(Message message) { - Object payload = message.getPayload(); - if (payload instanceof MessagingException e) { - Message failedMessage = e.getFailedMessage(); - return failedMessage != null ? failedMessage : message; - } - return message; - } - - private Span createProducerSpan(FunctionContext context, MessageHeaderAccessor headers, Span.Builder spanBuilder) { - // TODO: Add documented span for this - spanBuilder.kind(Span.Kind.PRODUCER).name("send").remoteServiceName(toRemoteServiceName(headers)); - Span span = spanBuilder.start(); - if (!span.isNoop()) { - tagSpan(context, span); - } - return span; - } - - private String toRemoteServiceName(MessageHeaderAccessor headers) { -// for (String key : headers.getMessageHeaders().keySet()) { -// if (key.startsWith("kafka_")) { -// return "kafka"; -// } -// else if (key.startsWith("amqp_")) { -// return "rabbitmq"; -// } -// } - String serviceName = (String) headers.getHeader(MessageUtils.TARGET_PROTOCOL); - if (!StringUtils.hasLength(serviceName)) { - serviceName = REMOTE_SERVICE_NAME; - } - return serviceName; - } - - private Message outputMessage(Message originalMessage, Message retrievedMessage, - MessageHeaderAccessor additionalHeaders) { - MessageHeaderAccessor headers = mutableHeaderAccessor(originalMessage); - if (originalMessage instanceof ErrorMessage errorMessage) { - headers.copyHeaders(MessageHeaderPropagatorSetter.copyHeaders(additionalHeaders.getMessageHeaders(), - this.propagator.fields())); - return new ErrorMessage(errorMessage.getPayload(), isWebSockets(headers) ? headers.getMessageHeaders() - : new MessageHeaders(headers.getMessageHeaders()), errorMessage.getOriginalMessage()); - } - headers.copyHeaders(additionalHeaders.getMessageHeaders()); - return new GenericMessage<>(retrievedMessage.getPayload(), - isWebSockets(headers) ? headers.getMessageHeaders() : new MessageHeaders(headers.getMessageHeaders())); - } - - private boolean isWebSockets(MessageHeaderAccessor headerAccessor) { - return headerAccessor.getMessageHeaders().containsKey("stompCommand") - || headerAccessor.getMessageHeaders().containsKey("simpMessageType"); - } - - @Override - public Tracer getTracer() { - return this.tracer; - } - -// @Override - public boolean supportsContext(Observation.Context context) { - return context instanceof FunctionContext && (((FunctionContext) context).getInput() instanceof Message); - } - - private static class MessageAndSpan { - - final Message msg; - - final Span span; - - MessageAndSpan(Message msg, Span span) { - this.msg = msg; - this.span = span; - } - - @Override - public String toString() { - return "MessageAndSpan{" + "msg=" + this.msg + ", span=" + this.span + '}'; - } - - } - - private static class MessageAndSpans { - - final Message msg; - - final Span parentSpan; - - final Span childSpan; - - MessageAndSpans(Message msg, Span parentSpan, Span childSpan) { - this.msg = msg; - this.parentSpan = parentSpan; - this.childSpan = childSpan; - } - - @Override - public String toString() { - return "MessageAndSpans{" + "msg=" + msg + ", parentSpan=" + parentSpan + ", childSpan=" + childSpan + '}'; - } - - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java deleted file mode 100644 index ff5140bcf..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorGetter.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability.tracing; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.StringUtils; - -/** - * Getter for Spring Integration based communication. - * - * This always sets native headers in defence of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @since 4.0.0 - */ -public class MessageHeaderPropagatorGetter implements Propagator.Getter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorGetter.class); - - @Override - public String get(MessageHeaderAccessor accessor, String key) { - try { - String value = doGet(accessor, key); - if (StringUtils.hasText(value)) { - return value; - } - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - return null; - } - - private String doGet(MessageHeaderAccessor accessor, String key) { - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - Map> nativeHeadersMap = nativeAccessor.toNativeHeaderMap(); - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - Map nativeHeadersMap = (Map) nativeHeaders; - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - } - Set> headerEntries = accessor.getMessageHeaders().entrySet(); - return getFromHeaders(headerEntries, key); - } - - private String getFromHeaders(Set> headerEntries, String key) { - for (Map.Entry entry : headerEntries) { - if (entry.getKey().equalsIgnoreCase(key)) { - Object result = entry.getValue(); - if (result != null) { - if (result instanceof byte[]) { - return new String((byte[]) result, StandardCharsets.UTF_8); - } - return result.toString(); - } - } - } - return null; - } - - private String getFromNativeHeaders(Map nativeHeaders, String key) { - Set entrySet = nativeHeaders.entrySet(); - for (Map.Entry entries : entrySet) { - if (entries.getKey() instanceof String) { - String headersKey = (String) entries.getKey(); - if (headersKey.equalsIgnoreCase(key)) { - Object result = entries.getValue(); - if (result instanceof List && !((List) result).isEmpty()) { - return String.valueOf(((List) result).get(0)); - } - } - } - } - return null; - } - - @Override - public String toString() { - return "MessageHeaderPropagatorGetter{}"; - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java deleted file mode 100644 index f37d0fba6..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/observability/tracing/MessageHeaderPropagatorSetter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability.tracing; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.LinkedMultiValueMap; - -/** - * Setter for Spring Integration based communication. - * - * This always sets native headers in defense of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class MessageHeaderPropagatorSetter implements Propagator.Setter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorSetter.class); - - static Map copyHeaders(Map headers, List headersToCopy) { - Map copiedHeaders = new HashMap<>(); - for (Map.Entry entry : headers.entrySet()) { - if (headersToCopy.contains(entry.getKey())) { - copiedHeaders.put(entry.getKey(), entry.getValue()); - } - } - return copiedHeaders; - } - - static void removeHeaders(MessageHeaderAccessor accessor, List keysToRemove) { - for (String keyToRemove : keysToRemove) { - accessor.removeHeader(keyToRemove); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - if (accessor.isMutable()) { - // 1184 native headers can be an immutable map - ensureNativeHeadersAreMutable(nativeAccessor).removeNativeHeader(keyToRemove); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - ((Map) nativeHeaders).remove(keyToRemove); - } - } - } - } - - /** - * Since for some reason, the native headers sometimes are immutable even though the - * accessor says that the headers are mutable, then we have to ensure their - * mutability. We do so by first making a mutable copy of the native headers, then by - * removing the native headers from the headers map and replacing them with a mutable - * copy. Workaround for #1184 - * @param nativeAccessor accessor containing (or not) native headers - * @return modified accessor - */ - private static NativeMessageHeaderAccessor ensureNativeHeadersAreMutable( - NativeMessageHeaderAccessor nativeAccessor) { - Map> nativeHeaderMap = nativeAccessor.toNativeHeaderMap(); - nativeHeaderMap = nativeHeaderMap instanceof LinkedMultiValueMap ? nativeHeaderMap - : new LinkedMultiValueMap<>(nativeHeaderMap); - nativeAccessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaderMap); - return nativeAccessor; - } - - @Override - public void set(MessageHeaderAccessor accessor, String key, String value) { - try { - doPut(accessor, key, value); - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - } - - private void doPut(MessageHeaderAccessor accessor, String key, String value) { - accessor.setHeader(key, value); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - ensureNativeHeadersAreMutable(nativeAccessor).setNativeHeader(key, value); - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders == null) { - nativeHeaders = new LinkedMultiValueMap<>(); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); - } - if (nativeHeaders instanceof Map) { - Map> copy = toNativeHeaderMap((Map>) nativeHeaders); - copy.put(key, Collections.singletonList(value)); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, copy); - } - } - } - - private Map> toNativeHeaderMap(Map> map) { - return (map != null ? new LinkedMultiValueMap<>(map) : Collections.emptyMap()); - } - - @Override - public String toString() { - return "MessageHeaderPropagatorSetter{}"; - } - -} diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java deleted file mode 100644 index f04cee3fa..000000000 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/observability/ObservationFunctionAroundWrapperIntegrationTests.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Copyright 2019-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.catalog.observability; - -import java.util.Collections; -import java.util.Deque; -import java.util.List; -import java.util.function.BiConsumer; -import java.util.function.Function; - -import com.fasterxml.jackson.databind.ObjectMapper; -import io.micrometer.core.instrument.MeterRegistry; -import io.micrometer.core.instrument.observation.TimerObservationHandler; -import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.micrometer.observation.ObservationHandler; -import io.micrometer.observation.ObservationRegistry; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.TraceContext; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.exporter.FinishedSpan; -import io.micrometer.tracing.propagation.Propagator; -import io.micrometer.tracing.test.SampleTestRunner; -import io.micrometer.tracing.test.reporter.BuildingBlocks; -import io.micrometer.tracing.test.simple.SpanAssert; -import io.micrometer.tracing.test.simple.SpansAssert; -import org.junit.jupiter.api.Disabled; - -import org.springframework.cloud.function.context.FunctionRegistration; -import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.cloud.function.context.catalog.observability.tracing.FunctionTracingObservationHandler; -import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorGetter; -import org.springframework.cloud.function.context.catalog.observability.tracing.MessageHeaderPropagatorSetter; -import org.springframework.cloud.function.context.config.JsonMessageConverter; -import org.springframework.cloud.function.json.JacksonMapper; -import org.springframework.core.convert.support.DefaultConversionService; -import org.springframework.messaging.Message; -import org.springframework.messaging.converter.CompositeMessageConverter; -import org.springframework.messaging.support.MessageBuilder; - -import static org.assertj.core.api.Assertions.assertThat; - -@Disabled -class ObservationFunctionAroundWrapperIntegrationTests extends SampleTestRunner { - - CompositeMessageConverter messageConverter = new CompositeMessageConverter( - Collections.singletonList(new JsonMessageConverter(new JacksonMapper(new ObjectMapper())))); - - SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(new DefaultConversionService(), messageConverter, - new JacksonMapper(new ObjectMapper())); - - ObservationFunctionAroundWrapperIntegrationTests() { - MeterRegistry meterRegistry = new SimpleMeterRegistry(); - ObservationRegistry registry = (ObservationRegistry) ObservationRegistry.create().observationConfig().observationHandler(new TimerObservationHandler(meterRegistry)); - //super(SampleRunnerConfig.builder().build(), new SimpleMeterRegistry().withTimerObservationHandler()); - } - - @Override - public BiConsumer> customizeObservationHandlers() { - return (buildingBlocks, observationHandlers) -> observationHandlers.addFirst(new FunctionTracingObservationHandler(buildingBlocks.getTracer(), testPropagator(buildingBlocks.getTracer()), new MessageHeaderPropagatorGetter(), new MessageHeaderPropagatorSetter())); - } - - private Propagator testPropagator(Tracer tracer) { - return new Propagator() { - - @Override - public void inject(TraceContext context, C carrier, Setter setter) { - setter.set(carrier, "superHeader", "test"); - } - - @Override - public List fields() { - return Collections.singletonList("superHeader"); - } - - @Override - public Span.Builder extract(C carrier, Getter getter) { - return tracer.spanBuilder(); - } - }; - } - - @Override - public SampleTestRunnerConsumer yourCode() throws Exception { - return (buildingBlocks, meterRegistry) -> { - ObservationFunctionAroundWrapper wrapper = new ObservationFunctionAroundWrapper((ObservationRegistry) meterRegistry); - - // TESTS - - test_tracing_with_function(wrapper, buildingBlocks); - }; - } - - private void test_tracing_with_function(ObservationFunctionAroundWrapper wrapper, BuildingBlocks bb) { - FunctionRegistration registration = new FunctionRegistration<>(new GreeterFunction(), - "greeter").type(FunctionTypeUtils.discoverFunctionTypeFromClass(GreeterFunction.class)); - catalog.register(registration); - SimpleFunctionRegistry.FunctionInvocationWrapper function = catalog.lookup("greeter"); - - Message result = (Message) wrapper - .apply(MessageBuilder.withPayload("hello").setHeader("superHeader", "someValue").build(), function); - - assertThat(result.getPayload()).isEqualTo("HELLO"); - List spans = bb.getFinishedSpans(); - SpansAssert.assertThat(spans) - .haveSameTraceId() - .hasSize(3); - SpanAssert.assertThat(spans.get(0)).hasNameEqualTo("handle").isStarted(); - SpanAssert.assertThat(spans.get(1)).hasNameEqualTo("greeter").isStarted(); - SpanAssert.assertThat(spans.get(2)).hasNameEqualTo("send").isStarted(); - } - - - private static class GreeterFunction implements Function { - - @Override - public String apply(String in) { - return in.toUpperCase(); - } - - } -}