From d4917dc8a7ac4d9578dc11617b58b97f8b218f9b Mon Sep 17 00:00:00 2001 From: Marcin Grzejszczak Date: Wed, 19 Oct 2022 21:00:16 +0200 Subject: [PATCH] Fixed Observability module to work with Sender and Receiver contexts --- .../ObservabilityAutoConfiguration.java | 40 --- ...DefaultFunctionObservationConvention.java} | 24 +- ...FunctionReceiverObservationConvention.java | 48 +++ ...ltFunctionSenderObservationConvention.java | 26 +- .../observability/FunctionContext.java | 49 +-- .../observability/FunctionObservation.java | 49 ++- .../FunctionObservationConvention.java | 3 +- .../FunctionReceiverContext.java | 44 +++ ...FunctionReceiverObservationConvention.java | 35 ++ .../observability/FunctionSenderContext.java | 60 +--- ... FunctionSenderObservationConvention.java} | 6 +- .../FunctionTracingObservationHandler.java | 335 ------------------ .../MessageHeaderPropagatorGetter.java | 118 ------ .../MessageHeaderPropagatorSetter.java | 134 ------- .../ObservationAutoConfiguration.java | 7 +- .../ObservationFunctionAroundWrapper.java | 102 ++++-- ...ot.autoconfigure.AutoConfiguration.imports | 1 - spring-cloud-function-observability/.jdk8 | 0 spring-cloud-function-observability/pom.xml | 52 --- .../observability/FunctionObservation.java | 65 ---- .../FunctionTracingObservationHandler.java | 335 ------------------ .../MessageHeaderPropagatorGetter.java | 118 ------ .../MessageHeaderPropagatorSetter.java | 134 ------- .../ObservationAutoConfiguration.java | 40 --- .../ObservationFunctionAroundWrapper.java | 88 ----- .../main/resources/META-INF/spring.factories | 0 ...ot.autoconfigure.AutoConfiguration.imports | 1 - ...ObservationFunctionAroundWrapperTests.java | 75 ---- 28 files changed, 307 insertions(+), 1682 deletions(-) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ObservabilityAutoConfiguration.java rename spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/{DefaultFunctionTagsProvider.java => DefaultFunctionObservationConvention.java} (61%) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionReceiverObservationConvention.java rename spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java => spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionSenderObservationConvention.java (55%) rename spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java => spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservationConvention.java (90%) create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverContext.java create mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverObservationConvention.java rename spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java => spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderContext.java (50%) rename spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/{FunctionTagsProvider.java => FunctionSenderObservationConvention.java} (80%) delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java delete mode 100644 spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java delete mode 100644 spring-cloud-function-observability/.jdk8 delete mode 100644 spring-cloud-function-observability/pom.xml delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java delete mode 100644 spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java delete mode 100644 spring-cloud-function-observability/src/main/resources/META-INF/spring.factories delete mode 100644 spring-cloud-function-observability/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports delete mode 100644 spring-cloud-function-observability/src/test/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapperTests.java diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ObservabilityAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ObservabilityAutoConfiguration.java deleted file mode 100644 index 1c846ad1c..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/ObservabilityAutoConfiguration.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2022-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.context.config; - -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.propagation.Propagator; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.cloud.function.observability.FunctionTracingObservationHandler; -import org.springframework.cloud.function.observability.MessageHeaderPropagatorGetter; -import org.springframework.cloud.function.observability.MessageHeaderPropagatorSetter; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -@Configuration(proxyBeanMethods = false) -@ConditionalOnClass(name = "io.micrometer.tracing.Tracer") -public class ObservabilityAutoConfiguration { - - @Bean - @ConditionalOnBean(Tracer.class) - FunctionTracingObservationHandler functionTracingObservationHandler(Tracer tracer, Propagator propagator) { - return new FunctionTracingObservationHandler(tracer, propagator, new MessageHeaderPropagatorGetter(), - new MessageHeaderPropagatorSetter()); - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java similarity index 61% rename from spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java index 8b75b4ad9..7031862ce 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java @@ -18,9 +18,31 @@ package org.springframework.cloud.function.observability; import io.micrometer.common.KeyValues; -public class DefaultFunctionTagsProvider implements FunctionTagsProvider { +/** + * Default implementation of {@link FunctionReceiverObservationConvention}. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class DefaultFunctionObservationConvention implements FunctionObservationConvention { + + /** + * Singleton instance of this convention. + */ + public static final FunctionObservationConvention INSTANCE = new DefaultFunctionObservationConvention(); + @Override public KeyValues getLowCardinalityKeyValues(FunctionContext context) { return KeyValues.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.withValue(context.getTargetFunction().getFunctionDefinition())); } + + @Override + public String getName() { + return "spring.cloud.function"; + } + + @Override + public String getContextualName(FunctionContext context) { + return context.getTargetFunction().getFunctionDefinition(); + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionReceiverObservationConvention.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionReceiverObservationConvention.java new file mode 100644 index 000000000..6e52b62c1 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionReceiverObservationConvention.java @@ -0,0 +1,48 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.observability; + +import io.micrometer.common.KeyValues; + +/** + * Default implementation of {@link FunctionReceiverObservationConvention}. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class DefaultFunctionReceiverObservationConvention implements FunctionReceiverObservationConvention { + + /** + * Singleton instance of this convention. + */ + public static final FunctionReceiverObservationConvention INSTANCE = new DefaultFunctionReceiverObservationConvention(); + + @Override + public KeyValues getLowCardinalityKeyValues(FunctionReceiverContext context) { + return KeyValues.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.withValue(context.getTargetFunction().getFunctionDefinition())); + } + + @Override + public String getName() { + return "spring.cloud.function.receive"; + } + + @Override + public String getContextualName(FunctionReceiverContext context) { + return context.getTargetFunction().getFunctionDefinition() + " receive"; + } +} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionSenderObservationConvention.java similarity index 55% rename from spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionSenderObservationConvention.java index 8b75b4ad9..e30cbd044 100644 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionTagsProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionSenderObservationConvention.java @@ -18,9 +18,31 @@ package org.springframework.cloud.function.observability; import io.micrometer.common.KeyValues; -public class DefaultFunctionTagsProvider implements FunctionTagsProvider { +/** + * Default implementation of {@link FunctionSenderObservationConvention}. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class DefaultFunctionSenderObservationConvention implements FunctionSenderObservationConvention { + + /** + * Singleton instance of this convention. + */ + public static final FunctionSenderObservationConvention INSTANCE = new DefaultFunctionSenderObservationConvention(); + @Override - public KeyValues getLowCardinalityKeyValues(FunctionContext context) { + public KeyValues getLowCardinalityKeyValues(FunctionSenderContext context) { return KeyValues.of(FunctionObservation.FunctionLowCardinalityTags.FUNCTION_NAME.withValue(context.getTargetFunction().getFunctionDefinition())); } + + @Override + public String getName() { + return "spring.cloud.function.send"; + } + + @Override + public String getContextualName(FunctionSenderContext context) { + return context.getTargetFunction().getFunctionDefinition() + " send"; + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java index 4fd047f2c..a0d7f1c63 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java @@ -21,7 +21,7 @@ import io.micrometer.observation.Observation; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; /** - * Context. + * {@link Observation.Context} for function processing. * * @author Marcin Grzejszczak * @since 4.0.0 @@ -30,59 +30,12 @@ public class FunctionContext extends Observation.Context { private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction; - private Object input; - - private Object modifiedInput; - - private Object output; - - private Object modifiedOutput; - public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { this.targetFunction = targetFunction; } - public FunctionContext withInput(Object input) { - this.input = input; - this.modifiedInput = input; - return this; - } - - public FunctionContext withOutput(Object output) { - this.output = output; - this.modifiedOutput = output; - return this; - } - - public Object getInput() { - return input; - } - public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() { return targetFunction; } - public Object getModifiedInput() { - return modifiedInput; - } - - public void setModifiedInput(Object modifiedInput) { - this.modifiedInput = modifiedInput; - } - - public Object getOutput() { - return output; - } - - public void setOutput(Object output) { - this.output = output; - } - - public Object getModifiedOutput() { - return modifiedOutput; - } - - public void setModifiedOutput(Object modifiedOutput) { - this.modifiedOutput = modifiedOutput; - } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java index 2032bfdc2..ef2a207c2 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java @@ -17,6 +17,8 @@ package org.springframework.cloud.function.observability; import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; import io.micrometer.observation.docs.ObservationDocumentation; /** @@ -26,17 +28,52 @@ import io.micrometer.observation.docs.ObservationDocumentation; */ enum FunctionObservation implements ObservationDocumentation { /** - * Observation created around a function execution. + * Observation created around receiving a message (via consumer or function). */ - FUNCTION_OBSERVATION { + FUNCTION_CONSUMER_OBSERVATION { @Override - public String getName() { - return "spring.cloud.function"; + public Class> getDefaultConvention() { + return DefaultFunctionReceiverObservationConvention.class; } @Override - public String getContextualName() { - return "function"; + public KeyName[] getLowCardinalityKeyNames() { + return FunctionLowCardinalityTags.values(); + } + + @Override + public String getPrefix() { + return "spring.cloud.function"; + } + }, + + /** + * Observation created around producing a message (via supplier or function). + */ + FUNCTION_PRODUCER_OBSERVATION { + @Override + public Class> getDefaultConvention() { + return DefaultFunctionSenderObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return FunctionLowCardinalityTags.values(); + } + + @Override + public String getPrefix() { + return "spring.cloud.function"; + } + }, + + /** + * Observation created around processing a message (functional bean processing). + */ + FUNCTION_PROCESSING_OBSERVATION { + @Override + public Class> getDefaultConvention() { + return DefaultFunctionObservationConvention.class; } @Override diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservationConvention.java similarity index 90% rename from spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservationConvention.java index 0dcd7e389..1a5be2a0c 100644 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionObservationConvention.java @@ -24,8 +24,9 @@ import io.micrometer.observation.ObservationConvention; * * @author Marcin Grzejszczak * @author Oleg Zhurakousky + * @since 4.0.0 */ -public interface FunctionTagsProvider extends ObservationConvention { +public interface FunctionObservationConvention extends ObservationConvention { @Override default boolean supportsContext(Observation.Context context) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverContext.java new file mode 100644 index 000000000..bfa615671 --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverContext.java @@ -0,0 +1,44 @@ +/* + * Copyright 2013-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.observability; + +import io.micrometer.observation.transport.ReceiverContext; + +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; +import org.springframework.messaging.Message; + +/** + * {@link ReceiverContext} for receiving messages through functional interfaces. + * + * @author Marcin Grzejszczak + * @since 4.0.0 + */ +public class FunctionReceiverContext extends ReceiverContext> { + + private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction; + + public FunctionReceiverContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Message carrier) { + super((message, key) -> (String) message.getHeaders().get(key)); + this.targetFunction = targetFunction; + setCarrier(carrier); + } + + public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() { + return targetFunction; + } + +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverObservationConvention.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverObservationConvention.java new file mode 100644 index 000000000..865bb1dba --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionReceiverObservationConvention.java @@ -0,0 +1,35 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.observability; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; + +/** + * {@link ObservationConvention} for {@link FunctionReceiverContext}. + * + * @author Marcin Grzejszczak + * @author Oleg Zhurakousky + * @since 4.0.0 + */ +public interface FunctionReceiverObservationConvention extends ObservationConvention { + + @Override + default boolean supportsContext(Observation.Context context) { + return context instanceof FunctionReceiverContext; + } +} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderContext.java similarity index 50% rename from spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderContext.java index 4fd047f2c..c3bb6df34 100644 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderContext.java @@ -16,73 +16,31 @@ package org.springframework.cloud.function.observability; -import io.micrometer.observation.Observation; +import java.util.Objects; + +import io.micrometer.observation.transport.SenderContext; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; +import org.springframework.messaging.support.MessageBuilder; /** - * Context. + * {@link SenderContext} for sending messages through functional interfaces. * * @author Marcin Grzejszczak * @since 4.0.0 */ -public class FunctionContext extends Observation.Context { +public class FunctionSenderContext extends SenderContext> { private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction; - private Object input; - - private Object modifiedInput; - - private Object output; - - private Object modifiedOutput; - - public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + public FunctionSenderContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, MessageBuilder carrier) { + super((messageBuilder, key, value) -> Objects.requireNonNull(messageBuilder).setHeader(key, value)); this.targetFunction = targetFunction; - } - - public FunctionContext withInput(Object input) { - this.input = input; - this.modifiedInput = input; - return this; - } - - public FunctionContext withOutput(Object output) { - this.output = output; - this.modifiedOutput = output; - return this; - } - - public Object getInput() { - return input; + setCarrier(carrier); } public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() { return targetFunction; } - public Object getModifiedInput() { - return modifiedInput; - } - - public void setModifiedInput(Object modifiedInput) { - this.modifiedInput = modifiedInput; - } - - public Object getOutput() { - return output; - } - - public void setOutput(Object output) { - this.output = output; - } - - public Object getModifiedOutput() { - return modifiedOutput; - } - - public void setModifiedOutput(Object modifiedOutput) { - this.modifiedOutput = modifiedOutput; - } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderObservationConvention.java similarity index 80% rename from spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java rename to spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderObservationConvention.java index 0dcd7e389..fb78c095d 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTagsProvider.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionSenderObservationConvention.java @@ -20,15 +20,15 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationConvention; /** - * {@link ObservationConvention} for {@link FunctionContext}. + * {@link ObservationConvention} for {@link FunctionSenderContext}. * * @author Marcin Grzejszczak * @author Oleg Zhurakousky */ -public interface FunctionTagsProvider extends ObservationConvention { +public interface FunctionSenderObservationConvention extends ObservationConvention { @Override default boolean supportsContext(Observation.Context context) { - return context instanceof FunctionContext; + return context instanceof FunctionSenderContext; } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java deleted file mode 100644 index 1758a4ce9..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import io.micrometer.observation.Observation; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.handler.TracingObservationHandler; -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.util.StringUtils; - - - -/** - * Function Tracing Observation Handler. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class FunctionTracingObservationHandler implements TracingObservationHandler { - - private static final Log log = LogFactory.getLog(FunctionTracingObservationHandler.class); - - /** - * Using the literal "broker" until we come up with a better solution. - * - *

- * If the message originated from a binder (consumer binding), there will be different - * headers present (e.g. "KafkaHeaders.RECEIVED_TOPIC" Vs. - * "AmqpHeaders.CONSUMER_QUEUE" (unless the application removes them before sending). - * These don't represent the broker, rather a queue, and in any case the heuristics - * are not great. At least we might be able to tell if this is rabbit or not (ex how - * spring-rabbit works). We need to think this through before making an api, possibly - * experimenting. - * - *

- * If the app is outbound only (producer), there's no indication of what type the - * destination broker is. This may hint at a non-manual solution being overwriting the - * remoteServiceName later, similar to how servlet instrumentation lazy set - * "http.route". - */ - private static final String REMOTE_SERVICE_NAME = "broker"; - - private final Tracer tracer; - - private final Propagator propagator; - - private final Propagator.Getter getter; - - private final Propagator.Setter setter; - - public FunctionTracingObservationHandler(Tracer tracer, Propagator propagator, MessageHeaderPropagatorGetter getter, MessageHeaderPropagatorSetter setter) { - this.tracer = tracer; - this.propagator = propagator; - this.getter = getter; - this.setter = setter; - } - - @Override - public void onStart(FunctionContext context) { - Message message = (Message) context.getInput(); - MessageAndSpans wrappedInputMessage = null; - SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction = context.getTargetFunction(); - Span functionSpan = null; - if (message == null && targetFunction.isSupplier()) { // Supplier - if (log.isDebugEnabled()) { - log.debug("Creating a span for a supplier"); - } - functionSpan = this.tracer.nextSpan().start(); - } - else { - if (log.isDebugEnabled()) { - log.debug("Will retrieve the tracing headers from the message"); - } - // This will create a handle span - wrappedInputMessage = wrapInputMessage(context, message); - if (log.isDebugEnabled()) { - log.debug("Wrapped input msg " + wrappedInputMessage); - } - functionSpan = wrappedInputMessage.childSpan; - } - context.put(MessageAndSpans.class, wrappedInputMessage); - // This is the function span - getTracingContext(context).setSpan(functionSpan); - } - - @Override - public void onStop(FunctionContext context) { - MessageAndSpans invocationMessage = context.get(MessageAndSpans.class); - Span functionSpan = getRequiredSpan(context); - functionSpan.name(context.getTargetFunction().getFunctionDefinition()).end(); - Object result = context.getOutput(); - Message msgResult = toMessage(result); - MessageAndSpan wrappedOutputMessage; - if (log.isDebugEnabled()) { - log.debug("Will instrument the output message"); - } - if (invocationMessage != null) { - wrappedOutputMessage = wrapOutputMessage(msgResult, invocationMessage.parentSpan, context); - } - else { - wrappedOutputMessage = wrapOutputMessage(msgResult, functionSpan, context); - } - if (log.isDebugEnabled()) { - log.debug("Wrapped output msg " + wrappedOutputMessage); - } - wrappedOutputMessage.span.end(); - context.setModifiedOutput(wrappedOutputMessage.msg); - } - -// String inputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } -// -// String outputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } - - private Message toMessage(Object result) { - if (!(result instanceof Message)) { - return MessageBuilder.withPayload(result).build(); - } - return (Message) result; - } - - /** - * Wraps the given input message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpans wrapInputMessage(FunctionContext context, Message message) { - MessageHeaderAccessor headers = mutableHeaderAccessor(message); - Span.Builder consumerSpanBuilder = this.propagator.extract(headers, this.getter); - Span handleSpan = consumerSpan(context, consumerSpanBuilder); - if (log.isDebugEnabled()) { - log.debug("Built a consumer span " + handleSpan); - } - Span functionSpan = tracer.nextSpan(handleSpan).name(context.getContextualName()).start(); - clearTracingHeaders(headers); - if (message instanceof ErrorMessage) { - return new MessageAndSpans(new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - headers.setImmutable(); - return new MessageAndSpans(new GenericMessage<>(message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - - // Handle span - private Span consumerSpan(FunctionContext context, Span.Builder consumerSpanBuilder) { - // TODO: Add this as a documented span - consumerSpanBuilder.kind(Span.Kind.CONSUMER).name("handle"); - consumerSpanBuilder.remoteServiceName(REMOTE_SERVICE_NAME); - // this is the consumer part of the producer->consumer mechanism - Span consumerSpan = consumerSpanBuilder.start(); - tagSpan(context, consumerSpan); - // we're ending this immediately just to have a properly nested graph - consumerSpan.end(); - return consumerSpan; - } - - private MessageHeaderAccessor mutableHeaderAccessor(Message message) { - MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); - if (accessor != null && accessor.isMutable()) { - return accessor; - } - MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message); - headers.setLeaveMutable(true); - return headers; - } - - private void clearTracingHeaders(MessageHeaderAccessor headers) { - MessageHeaderPropagatorSetter.removeHeaders(headers, this.propagator.fields()); - } - - /** - * Wraps the given output message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpan wrapOutputMessage(Message message, Span parentSpan, FunctionContext context) { - Message retrievedMessage = getMessage(message); - MessageHeaderAccessor headers = mutableHeaderAccessor(retrievedMessage); - Span.Builder sendSpanBuilder = tracer.spanBuilder().setParent(parentSpan.context()); - clearTracingHeaders(headers); - Span sendSpan = createProducerSpan(context, headers, sendSpanBuilder); - this.propagator.inject(sendSpan.context(), headers, this.setter); - if (log.isDebugEnabled()) { - log.debug("Created a new span output message " + sendSpanBuilder); - } - return new MessageAndSpan(outputMessage(message, retrievedMessage, headers), sendSpan); - } - - private Message getMessage(Message message) { - Object payload = message.getPayload(); - if (payload instanceof MessagingException e) { - Message failedMessage = e.getFailedMessage(); - return failedMessage != null ? failedMessage : message; - } - return message; - } - - private Span createProducerSpan(FunctionContext context, MessageHeaderAccessor headers, Span.Builder spanBuilder) { - // TODO: Add documented span for this - spanBuilder.kind(Span.Kind.PRODUCER).name("send").remoteServiceName(toRemoteServiceName(headers)); - Span span = spanBuilder.start(); - if (!span.isNoop()) { - tagSpan(context, span); - } - return span; - } - - private String toRemoteServiceName(MessageHeaderAccessor headers) { -// for (String key : headers.getMessageHeaders().keySet()) { -// if (key.startsWith("kafka_")) { -// return "kafka"; -// } -// else if (key.startsWith("amqp_")) { -// return "rabbitmq"; -// } -// } - String serviceName = (String) headers.getHeader(MessageUtils.TARGET_PROTOCOL); - if (!StringUtils.hasLength(serviceName)) { - serviceName = REMOTE_SERVICE_NAME; - } - return serviceName; - } - - private Message outputMessage(Message originalMessage, Message retrievedMessage, - MessageHeaderAccessor additionalHeaders) { - MessageHeaderAccessor headers = mutableHeaderAccessor(originalMessage); - if (originalMessage instanceof ErrorMessage errorMessage) { - headers.copyHeaders(MessageHeaderPropagatorSetter.copyHeaders(additionalHeaders.getMessageHeaders(), - this.propagator.fields())); - return new ErrorMessage(errorMessage.getPayload(), isWebSockets(headers) ? headers.getMessageHeaders() - : new MessageHeaders(headers.getMessageHeaders()), errorMessage.getOriginalMessage()); - } - headers.copyHeaders(additionalHeaders.getMessageHeaders()); - return new GenericMessage<>(retrievedMessage.getPayload(), - isWebSockets(headers) ? headers.getMessageHeaders() : new MessageHeaders(headers.getMessageHeaders())); - } - - private boolean isWebSockets(MessageHeaderAccessor headerAccessor) { - return headerAccessor.getMessageHeaders().containsKey("stompCommand") - || headerAccessor.getMessageHeaders().containsKey("simpMessageType"); - } - - @Override - public Tracer getTracer() { - return this.tracer; - } - -// @Override - public boolean supportsContext(Observation.Context context) { - return context instanceof FunctionContext && (((FunctionContext) context).getInput() instanceof Message); - } - - private static class MessageAndSpan { - - final Message msg; - - final Span span; - - MessageAndSpan(Message msg, Span span) { - this.msg = msg; - this.span = span; - } - - @Override - public String toString() { - return "MessageAndSpan{" + "msg=" + this.msg + ", span=" + this.span + '}'; - } - - } - - private static class MessageAndSpans { - - final Message msg; - - final Span parentSpan; - - final Span childSpan; - - MessageAndSpans(Message msg, Span parentSpan, Span childSpan) { - this.msg = msg; - this.parentSpan = parentSpan; - this.childSpan = childSpan; - } - - @Override - public String toString() { - return "MessageAndSpans{" + "msg=" + msg + ", parentSpan=" + parentSpan + ", childSpan=" + childSpan + '}'; - } - - } -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java deleted file mode 100644 index ab219da81..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.StringUtils; - -/** - * Getter for Spring Integration based communication. - * - * This always sets native headers in defence of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @since 4.0.0 - */ -public class MessageHeaderPropagatorGetter implements Propagator.Getter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorGetter.class); - - @Override - public String get(MessageHeaderAccessor accessor, String key) { - try { - String value = doGet(accessor, key); - if (StringUtils.hasText(value)) { - return value; - } - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - return null; - } - - private String doGet(MessageHeaderAccessor accessor, String key) { - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - Map> nativeHeadersMap = nativeAccessor.toNativeHeaderMap(); - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - Map nativeHeadersMap = (Map) nativeHeaders; - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - } - Set> headerEntries = accessor.getMessageHeaders().entrySet(); - return getFromHeaders(headerEntries, key); - } - - private String getFromHeaders(Set> headerEntries, String key) { - for (Map.Entry entry : headerEntries) { - if (entry.getKey().equalsIgnoreCase(key)) { - Object result = entry.getValue(); - if (result != null) { - if (result instanceof byte[]) { - return new String((byte[]) result, StandardCharsets.UTF_8); - } - return result.toString(); - } - } - } - return null; - } - - private String getFromNativeHeaders(Map nativeHeaders, String key) { - Set entrySet = nativeHeaders.entrySet(); - for (Map.Entry entries : entrySet) { - if (entries.getKey() instanceof String) { - String headersKey = (String) entries.getKey(); - if (headersKey.equalsIgnoreCase(key)) { - Object result = entries.getValue(); - if (result instanceof List && !((List) result).isEmpty()) { - return String.valueOf(((List) result).get(0)); - } - } - } - } - return null; - } - - @Override - public String toString() { - return "MessageHeaderPropagatorGetter{}"; - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java deleted file mode 100644 index 4ccfae2e2..000000000 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.LinkedMultiValueMap; - -/** - * Setter for Spring Integration based communication. - * - * This always sets native headers in defense of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class MessageHeaderPropagatorSetter implements Propagator.Setter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorSetter.class); - - static Map copyHeaders(Map headers, List headersToCopy) { - Map copiedHeaders = new HashMap<>(); - for (Map.Entry entry : headers.entrySet()) { - if (headersToCopy.contains(entry.getKey())) { - copiedHeaders.put(entry.getKey(), entry.getValue()); - } - } - return copiedHeaders; - } - - static void removeHeaders(MessageHeaderAccessor accessor, List keysToRemove) { - for (String keyToRemove : keysToRemove) { - accessor.removeHeader(keyToRemove); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - if (accessor.isMutable()) { - // 1184 native headers can be an immutable map - ensureNativeHeadersAreMutable(nativeAccessor).removeNativeHeader(keyToRemove); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - ((Map) nativeHeaders).remove(keyToRemove); - } - } - } - } - - /** - * Since for some reason, the native headers sometimes are immutable even though the - * accessor says that the headers are mutable, then we have to ensure their - * mutability. We do so by first making a mutable copy of the native headers, then by - * removing the native headers from the headers map and replacing them with a mutable - * copy. Workaround for #1184 - * @param nativeAccessor accessor containing (or not) native headers - * @return modified accessor - */ - private static NativeMessageHeaderAccessor ensureNativeHeadersAreMutable( - NativeMessageHeaderAccessor nativeAccessor) { - Map> nativeHeaderMap = nativeAccessor.toNativeHeaderMap(); - nativeHeaderMap = nativeHeaderMap instanceof LinkedMultiValueMap ? nativeHeaderMap - : new LinkedMultiValueMap<>(nativeHeaderMap); - nativeAccessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaderMap); - return nativeAccessor; - } - - @Override - public void set(MessageHeaderAccessor accessor, String key, String value) { - try { - doPut(accessor, key, value); - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - } - - private void doPut(MessageHeaderAccessor accessor, String key, String value) { - accessor.setHeader(key, value); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - ensureNativeHeadersAreMutable(nativeAccessor).setNativeHeader(key, value); - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders == null) { - nativeHeaders = new LinkedMultiValueMap<>(); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); - } - if (nativeHeaders instanceof Map) { - Map> copy = toNativeHeaderMap((Map>) nativeHeaders); - copy.put(key, Collections.singletonList(value)); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, copy); - } - } - } - - private Map> toNativeHeaderMap(Map> map) { - return (map != null ? new LinkedMultiValueMap<>(map) : Collections.emptyMap()); - } - - @Override - public String toString() { - return "MessageHeaderPropagatorSetter{}"; - } - -} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java index b9f193cd6..a57f441ba 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java @@ -18,15 +18,14 @@ package org.springframework.cloud.function.observability; import io.micrometer.observation.ObservationRegistry; +import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** - * * @author Oleg Zhurakousky - * */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(ObservationRegistry.class) @@ -34,7 +33,7 @@ public class ObservationAutoConfiguration { @Bean @ConditionalOnMissingBean - public ObservationFunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry) { - return new ObservationFunctionAroundWrapper(registry); + public ObservationFunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider functionReceiverObservationConvention, ObjectProvider functionObservationConvention, ObjectProvider functionSenderObservationConvention) { + return new ObservationFunctionAroundWrapper(registry, functionReceiverObservationConvention.getIfAvailable(() -> null), functionObservationConvention.getIfAvailable(() -> null), functionSenderObservationConvention.getIfAvailable(() -> null)); } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java index c2d9ba832..ba18ac65c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java @@ -20,12 +20,13 @@ import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; - +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; /** @@ -39,10 +40,17 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper { private final ObservationRegistry observationRegistry; - private FunctionTagsProvider tagsProvider = new DefaultFunctionTagsProvider(); + private final FunctionReceiverObservationConvention functionReceiverObservationConvention; - public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry) { + private final FunctionObservationConvention functionObservationConvention; + + private final FunctionSenderObservationConvention functionSenderObservationConvention; + + public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry, @Nullable FunctionReceiverObservationConvention functionReceiverObservationConvention, @Nullable FunctionObservationConvention functionObservationConvention, @Nullable FunctionSenderObservationConvention functionSenderObservationConvention) { this.observationRegistry = observationRegistry; + this.functionReceiverObservationConvention = functionReceiverObservationConvention; + this.functionObservationConvention = functionObservationConvention; + this.functionSenderObservationConvention = functionSenderObservationConvention; } @Override @@ -50,39 +58,73 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper { if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) { return targetFunction.apply(message); // no instrumentation } - else if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) { - return reactorStream((Publisher) message, targetFunction); - } return nonReactorStream(message, targetFunction); } - private Object reactorStream(Publisher message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - // TODO - return message; - } - + @SuppressWarnings("unchecked") private Object nonReactorStream(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - FunctionContext context = new FunctionContext(targetFunction).withInput(message); - Object invocationMessage = context.getModifiedInput(); - Object result = Observation - .createNotStarted(FunctionObservation.FUNCTION_OBSERVATION.getName(), () -> context, this.observationRegistry) - .contextualName(FunctionObservation.FUNCTION_OBSERVATION.getContextualName()) - .observe(() -> { - Object r = message == null ? targetFunction.get() : targetFunction.apply(invocationMessage); - context.setModifiedOutput(r); - return r; - }); - if (result == null) { - if (log.isDebugEnabled()) { - log.debug("Returned message is null - we have a consumer"); - } - return null; + if (targetFunction.isConsumer()) { + Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction); + Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage); + return consumerObservation.observe(() -> targetFunction.apply(message)); + } + else if (targetFunction.isFunction()) { + Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction); + Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage); + Object outputMessage = consumerObservation.observe(() -> targetFunction.apply(message)); + if (isNonNullMessageType(outputMessage)) { + return outputMessage; // no instrumentation + } + return observeOutputMessage(outputMessage, targetFunction, consumerObservation); + } + else { + Object supplierOutputMessage = functionProcessingObservation(targetFunction).observe(targetFunction::get); + if (isNonNullMessageType(supplierOutputMessage)) { + return supplierOutputMessage; // no instrumentation + } + return observeOutputMessage(supplierOutputMessage, targetFunction, null); } - return context.getModifiedOutput(); } - public void setKeyValuesProvider(FunctionTagsProvider functionTagsProvider) { - this.tagsProvider = functionTagsProvider; + private Observation functionProcessingObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + return FunctionObservation.FUNCTION_PROCESSING_OBSERVATION.observation(this.functionObservationConvention, DefaultFunctionObservationConvention.INSTANCE, () -> new FunctionContext(targetFunction), this.observationRegistry); + } + + private Observation consumerObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Observation observationOfInputMessage) { + return functionProcessingObservation(targetFunction) + .parentObservation(observationOfInputMessage); + } + + private boolean isNonNullMessageType(Object outputMessage) { + return outputMessage == null || !(outputMessage instanceof Message); + } + + /** + * Confirmation of getting of message from broker. + * + * @param message message to process + * @param targetFunction target function + * @return stopped observation + */ + private Observation stoppedObservationOfInputMessage(Object message, + SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { + Observation consumerObservation = FunctionObservation.FUNCTION_CONSUMER_OBSERVATION.observation(this.functionReceiverObservationConvention, DefaultFunctionReceiverObservationConvention.INSTANCE, () -> new FunctionReceiverContext(targetFunction, (Message) message), this.observationRegistry); + consumerObservation.start().stop(); + return consumerObservation; + } + + /** + * Enriching the output message. + * + * @param message message to process + * @param targetFunction target function + * @return enriched output message + */ + private Message observeOutputMessage(Object message, + SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, @Nullable Observation parentObservation) { + FunctionSenderContext context = new FunctionSenderContext(targetFunction, MessageBuilder.fromMessage((Message) message)); + FunctionObservation.FUNCTION_PRODUCER_OBSERVATION.observation(this.functionSenderObservationConvention, DefaultFunctionSenderObservationConvention.INSTANCE, () -> context, this.observationRegistry).parentObservation(parentObservation).start().stop(); + return context.getCarrier().build(); } } diff --git a/spring-cloud-function-context/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-function-context/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index 3a0d2ce51..90873e09a 100644 --- a/spring-cloud-function-context/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/spring-cloud-function-context/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -2,5 +2,4 @@ org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConf org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration org.springframework.cloud.function.context.config.FunctionsEndpointAutoConfiguration -org.springframework.cloud.function.context.config.ObservabilityAutoConfiguration org.springframework.cloud.function.observability.ObservationAutoConfiguration diff --git a/spring-cloud-function-observability/.jdk8 b/spring-cloud-function-observability/.jdk8 deleted file mode 100644 index e69de29bb..000000000 diff --git a/spring-cloud-function-observability/pom.xml b/spring-cloud-function-observability/pom.xml deleted file mode 100644 index a28f02da9..000000000 --- a/spring-cloud-function-observability/pom.xml +++ /dev/null @@ -1,52 +0,0 @@ - - - 4.0.0 - - spring-cloud-function-observability - jar - Spring Cloud Function Observability - Spring Cloud Function Observability - - - org.springframework.cloud - spring-cloud-function-parent - 4.0.0-SNAPSHOT - - - - - org.springframework.cloud - spring-cloud-function-context - - - io.micrometer - micrometer-observation - - - io.micrometer - micrometer-core - - - io.micrometer - micrometer-tracing - - - io.micrometer - micrometer-observation-test - test - - - org.springframework.boot - spring-boot-actuator-autoconfigure - test - - - org.springframework.boot - spring-boot-starter-test - test - - - - diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java deleted file mode 100644 index 2032bfdc2..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionObservation.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import io.micrometer.common.docs.KeyName; -import io.micrometer.observation.docs.ObservationDocumentation; - -/** - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -enum FunctionObservation implements ObservationDocumentation { - /** - * Observation created around a function execution. - */ - FUNCTION_OBSERVATION { - @Override - public String getName() { - return "spring.cloud.function"; - } - - @Override - public String getContextualName() { - return "function"; - } - - @Override - public KeyName[] getLowCardinalityKeyNames() { - return FunctionLowCardinalityTags.values(); - } - - @Override - public String getPrefix() { - return "spring.cloud.function"; - } - }; - - enum FunctionLowCardinalityTags implements KeyName { - - /** - * Name of the function. - */ - FUNCTION_NAME { - public String asString() { - return "spring.cloud.function.definition"; - } - } - - } -} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java deleted file mode 100644 index 1758a4ce9..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/FunctionTracingObservationHandler.java +++ /dev/null @@ -1,335 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import io.micrometer.observation.Observation; -import io.micrometer.tracing.Span; -import io.micrometer.tracing.Tracer; -import io.micrometer.tracing.handler.TracingObservationHandler; -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; -import org.springframework.cloud.function.context.message.MessageUtils; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; -import org.springframework.messaging.support.ErrorMessage; -import org.springframework.messaging.support.GenericMessage; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.util.StringUtils; - - - -/** - * Function Tracing Observation Handler. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class FunctionTracingObservationHandler implements TracingObservationHandler { - - private static final Log log = LogFactory.getLog(FunctionTracingObservationHandler.class); - - /** - * Using the literal "broker" until we come up with a better solution. - * - *

- * If the message originated from a binder (consumer binding), there will be different - * headers present (e.g. "KafkaHeaders.RECEIVED_TOPIC" Vs. - * "AmqpHeaders.CONSUMER_QUEUE" (unless the application removes them before sending). - * These don't represent the broker, rather a queue, and in any case the heuristics - * are not great. At least we might be able to tell if this is rabbit or not (ex how - * spring-rabbit works). We need to think this through before making an api, possibly - * experimenting. - * - *

- * If the app is outbound only (producer), there's no indication of what type the - * destination broker is. This may hint at a non-manual solution being overwriting the - * remoteServiceName later, similar to how servlet instrumentation lazy set - * "http.route". - */ - private static final String REMOTE_SERVICE_NAME = "broker"; - - private final Tracer tracer; - - private final Propagator propagator; - - private final Propagator.Getter getter; - - private final Propagator.Setter setter; - - public FunctionTracingObservationHandler(Tracer tracer, Propagator propagator, MessageHeaderPropagatorGetter getter, MessageHeaderPropagatorSetter setter) { - this.tracer = tracer; - this.propagator = propagator; - this.getter = getter; - this.setter = setter; - } - - @Override - public void onStart(FunctionContext context) { - Message message = (Message) context.getInput(); - MessageAndSpans wrappedInputMessage = null; - SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction = context.getTargetFunction(); - Span functionSpan = null; - if (message == null && targetFunction.isSupplier()) { // Supplier - if (log.isDebugEnabled()) { - log.debug("Creating a span for a supplier"); - } - functionSpan = this.tracer.nextSpan().start(); - } - else { - if (log.isDebugEnabled()) { - log.debug("Will retrieve the tracing headers from the message"); - } - // This will create a handle span - wrappedInputMessage = wrapInputMessage(context, message); - if (log.isDebugEnabled()) { - log.debug("Wrapped input msg " + wrappedInputMessage); - } - functionSpan = wrappedInputMessage.childSpan; - } - context.put(MessageAndSpans.class, wrappedInputMessage); - // This is the function span - getTracingContext(context).setSpan(functionSpan); - } - - @Override - public void onStop(FunctionContext context) { - MessageAndSpans invocationMessage = context.get(MessageAndSpans.class); - Span functionSpan = getRequiredSpan(context); - functionSpan.name(context.getTargetFunction().getFunctionDefinition()).end(); - Object result = context.getOutput(); - Message msgResult = toMessage(result); - MessageAndSpan wrappedOutputMessage; - if (log.isDebugEnabled()) { - log.debug("Will instrument the output message"); - } - if (invocationMessage != null) { - wrappedOutputMessage = wrapOutputMessage(msgResult, invocationMessage.parentSpan, context); - } - else { - wrappedOutputMessage = wrapOutputMessage(msgResult, functionSpan, context); - } - if (log.isDebugEnabled()) { - log.debug("Wrapped output msg " + wrappedOutputMessage); - } - wrappedOutputMessage.span.end(); - context.setModifiedOutput(wrappedOutputMessage.msg); - } - -// String inputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-in-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-in-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } -// -// String outputDestination(String functionDefinition) { -// return this.functionToDestinationCache.computeIfAbsent(functionDefinition, s -> { -// String bindingMappingProperty = "spring.cloud.stream.function.bindings." + s + "-out-0"; -// String bindingProperty = this.environment.containsProperty(bindingMappingProperty) -// ? this.environment.getProperty(bindingMappingProperty) : s + "-out-0"; -// return this.environment.getProperty("spring.cloud.stream.bindings." + bindingProperty + ".destination", s); -// }); -// } - - private Message toMessage(Object result) { - if (!(result instanceof Message)) { - return MessageBuilder.withPayload(result).build(); - } - return (Message) result; - } - - /** - * Wraps the given input message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpans wrapInputMessage(FunctionContext context, Message message) { - MessageHeaderAccessor headers = mutableHeaderAccessor(message); - Span.Builder consumerSpanBuilder = this.propagator.extract(headers, this.getter); - Span handleSpan = consumerSpan(context, consumerSpanBuilder); - if (log.isDebugEnabled()) { - log.debug("Built a consumer span " + handleSpan); - } - Span functionSpan = tracer.nextSpan(handleSpan).name(context.getContextualName()).start(); - clearTracingHeaders(headers); - if (message instanceof ErrorMessage) { - return new MessageAndSpans(new ErrorMessage((Throwable) message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - headers.setImmutable(); - return new MessageAndSpans(new GenericMessage<>(message.getPayload(), headers.getMessageHeaders()), - handleSpan, functionSpan); - } - - // Handle span - private Span consumerSpan(FunctionContext context, Span.Builder consumerSpanBuilder) { - // TODO: Add this as a documented span - consumerSpanBuilder.kind(Span.Kind.CONSUMER).name("handle"); - consumerSpanBuilder.remoteServiceName(REMOTE_SERVICE_NAME); - // this is the consumer part of the producer->consumer mechanism - Span consumerSpan = consumerSpanBuilder.start(); - tagSpan(context, consumerSpan); - // we're ending this immediately just to have a properly nested graph - consumerSpan.end(); - return consumerSpan; - } - - private MessageHeaderAccessor mutableHeaderAccessor(Message message) { - MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, MessageHeaderAccessor.class); - if (accessor != null && accessor.isMutable()) { - return accessor; - } - MessageHeaderAccessor headers = MessageHeaderAccessor.getMutableAccessor(message); - headers.setLeaveMutable(true); - return headers; - } - - private void clearTracingHeaders(MessageHeaderAccessor headers) { - MessageHeaderPropagatorSetter.removeHeaders(headers, this.propagator.fields()); - } - - /** - * Wraps the given output message with tracing headers and returns a corresponding - * span. - * @param message - message to wrap - * @return a tuple with the wrapped message and a corresponding span - */ - private MessageAndSpan wrapOutputMessage(Message message, Span parentSpan, FunctionContext context) { - Message retrievedMessage = getMessage(message); - MessageHeaderAccessor headers = mutableHeaderAccessor(retrievedMessage); - Span.Builder sendSpanBuilder = tracer.spanBuilder().setParent(parentSpan.context()); - clearTracingHeaders(headers); - Span sendSpan = createProducerSpan(context, headers, sendSpanBuilder); - this.propagator.inject(sendSpan.context(), headers, this.setter); - if (log.isDebugEnabled()) { - log.debug("Created a new span output message " + sendSpanBuilder); - } - return new MessageAndSpan(outputMessage(message, retrievedMessage, headers), sendSpan); - } - - private Message getMessage(Message message) { - Object payload = message.getPayload(); - if (payload instanceof MessagingException e) { - Message failedMessage = e.getFailedMessage(); - return failedMessage != null ? failedMessage : message; - } - return message; - } - - private Span createProducerSpan(FunctionContext context, MessageHeaderAccessor headers, Span.Builder spanBuilder) { - // TODO: Add documented span for this - spanBuilder.kind(Span.Kind.PRODUCER).name("send").remoteServiceName(toRemoteServiceName(headers)); - Span span = spanBuilder.start(); - if (!span.isNoop()) { - tagSpan(context, span); - } - return span; - } - - private String toRemoteServiceName(MessageHeaderAccessor headers) { -// for (String key : headers.getMessageHeaders().keySet()) { -// if (key.startsWith("kafka_")) { -// return "kafka"; -// } -// else if (key.startsWith("amqp_")) { -// return "rabbitmq"; -// } -// } - String serviceName = (String) headers.getHeader(MessageUtils.TARGET_PROTOCOL); - if (!StringUtils.hasLength(serviceName)) { - serviceName = REMOTE_SERVICE_NAME; - } - return serviceName; - } - - private Message outputMessage(Message originalMessage, Message retrievedMessage, - MessageHeaderAccessor additionalHeaders) { - MessageHeaderAccessor headers = mutableHeaderAccessor(originalMessage); - if (originalMessage instanceof ErrorMessage errorMessage) { - headers.copyHeaders(MessageHeaderPropagatorSetter.copyHeaders(additionalHeaders.getMessageHeaders(), - this.propagator.fields())); - return new ErrorMessage(errorMessage.getPayload(), isWebSockets(headers) ? headers.getMessageHeaders() - : new MessageHeaders(headers.getMessageHeaders()), errorMessage.getOriginalMessage()); - } - headers.copyHeaders(additionalHeaders.getMessageHeaders()); - return new GenericMessage<>(retrievedMessage.getPayload(), - isWebSockets(headers) ? headers.getMessageHeaders() : new MessageHeaders(headers.getMessageHeaders())); - } - - private boolean isWebSockets(MessageHeaderAccessor headerAccessor) { - return headerAccessor.getMessageHeaders().containsKey("stompCommand") - || headerAccessor.getMessageHeaders().containsKey("simpMessageType"); - } - - @Override - public Tracer getTracer() { - return this.tracer; - } - -// @Override - public boolean supportsContext(Observation.Context context) { - return context instanceof FunctionContext && (((FunctionContext) context).getInput() instanceof Message); - } - - private static class MessageAndSpan { - - final Message msg; - - final Span span; - - MessageAndSpan(Message msg, Span span) { - this.msg = msg; - this.span = span; - } - - @Override - public String toString() { - return "MessageAndSpan{" + "msg=" + this.msg + ", span=" + this.span + '}'; - } - - } - - private static class MessageAndSpans { - - final Message msg; - - final Span parentSpan; - - final Span childSpan; - - MessageAndSpans(Message msg, Span parentSpan, Span childSpan) { - this.msg = msg; - this.parentSpan = parentSpan; - this.childSpan = childSpan; - } - - @Override - public String toString() { - return "MessageAndSpans{" + "msg=" + msg + ", parentSpan=" + parentSpan + ", childSpan=" + childSpan + '}'; - } - - } -} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java deleted file mode 100644 index ab219da81..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorGetter.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.StringUtils; - -/** - * Getter for Spring Integration based communication. - * - * This always sets native headers in defence of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @since 4.0.0 - */ -public class MessageHeaderPropagatorGetter implements Propagator.Getter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorGetter.class); - - @Override - public String get(MessageHeaderAccessor accessor, String key) { - try { - String value = doGet(accessor, key); - if (StringUtils.hasText(value)) { - return value; - } - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - return null; - } - - private String doGet(MessageHeaderAccessor accessor, String key) { - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - Map> nativeHeadersMap = nativeAccessor.toNativeHeaderMap(); - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - Map nativeHeadersMap = (Map) nativeHeaders; - if (!nativeHeadersMap.isEmpty()) { - return getFromNativeHeaders(nativeHeadersMap, key); - } - } - } - Set> headerEntries = accessor.getMessageHeaders().entrySet(); - return getFromHeaders(headerEntries, key); - } - - private String getFromHeaders(Set> headerEntries, String key) { - for (Map.Entry entry : headerEntries) { - if (entry.getKey().equalsIgnoreCase(key)) { - Object result = entry.getValue(); - if (result != null) { - if (result instanceof byte[]) { - return new String((byte[]) result, StandardCharsets.UTF_8); - } - return result.toString(); - } - } - } - return null; - } - - private String getFromNativeHeaders(Map nativeHeaders, String key) { - Set entrySet = nativeHeaders.entrySet(); - for (Map.Entry entries : entrySet) { - if (entries.getKey() instanceof String) { - String headersKey = (String) entries.getKey(); - if (headersKey.equalsIgnoreCase(key)) { - Object result = entries.getValue(); - if (result instanceof List && !((List) result).isEmpty()) { - return String.valueOf(((List) result).get(0)); - } - } - } - } - return null; - } - - @Override - public String toString() { - return "MessageHeaderPropagatorGetter{}"; - } - -} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java deleted file mode 100644 index 4ccfae2e2..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/MessageHeaderPropagatorSetter.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2013-2021 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import io.micrometer.tracing.propagation.Propagator; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.springframework.messaging.support.MessageHeaderAccessor; -import org.springframework.messaging.support.NativeMessageHeaderAccessor; -import org.springframework.util.LinkedMultiValueMap; - -/** - * Setter for Spring Integration based communication. - * - * This always sets native headers in defense of STOMP issues discussed here. - * - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class MessageHeaderPropagatorSetter implements Propagator.Setter { - - private static final Log log = LogFactory.getLog(MessageHeaderPropagatorSetter.class); - - static Map copyHeaders(Map headers, List headersToCopy) { - Map copiedHeaders = new HashMap<>(); - for (Map.Entry entry : headers.entrySet()) { - if (headersToCopy.contains(entry.getKey())) { - copiedHeaders.put(entry.getKey(), entry.getValue()); - } - } - return copiedHeaders; - } - - static void removeHeaders(MessageHeaderAccessor accessor, List keysToRemove) { - for (String keyToRemove : keysToRemove) { - accessor.removeHeader(keyToRemove); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - if (accessor.isMutable()) { - // 1184 native headers can be an immutable map - ensureNativeHeadersAreMutable(nativeAccessor).removeNativeHeader(keyToRemove); - } - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders instanceof Map) { - ((Map) nativeHeaders).remove(keyToRemove); - } - } - } - } - - /** - * Since for some reason, the native headers sometimes are immutable even though the - * accessor says that the headers are mutable, then we have to ensure their - * mutability. We do so by first making a mutable copy of the native headers, then by - * removing the native headers from the headers map and replacing them with a mutable - * copy. Workaround for #1184 - * @param nativeAccessor accessor containing (or not) native headers - * @return modified accessor - */ - private static NativeMessageHeaderAccessor ensureNativeHeadersAreMutable( - NativeMessageHeaderAccessor nativeAccessor) { - Map> nativeHeaderMap = nativeAccessor.toNativeHeaderMap(); - nativeHeaderMap = nativeHeaderMap instanceof LinkedMultiValueMap ? nativeHeaderMap - : new LinkedMultiValueMap<>(nativeHeaderMap); - nativeAccessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaderMap); - return nativeAccessor; - } - - @Override - public void set(MessageHeaderAccessor accessor, String key, String value) { - try { - doPut(accessor, key, value); - } - catch (Exception ex) { - if (log.isDebugEnabled()) { - log.debug("An exception happened when we tried to retrieve the [" + key + "] from message", ex); - } - } - } - - private void doPut(MessageHeaderAccessor accessor, String key, String value) { - accessor.setHeader(key, value); - if (accessor instanceof NativeMessageHeaderAccessor) { - NativeMessageHeaderAccessor nativeAccessor = (NativeMessageHeaderAccessor) accessor; - ensureNativeHeadersAreMutable(nativeAccessor).setNativeHeader(key, value); - } - else { - Object nativeHeaders = accessor.getHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS); - if (nativeHeaders == null) { - nativeHeaders = new LinkedMultiValueMap<>(); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, nativeHeaders); - } - if (nativeHeaders instanceof Map) { - Map> copy = toNativeHeaderMap((Map>) nativeHeaders); - copy.put(key, Collections.singletonList(value)); - accessor.setHeader(NativeMessageHeaderAccessor.NATIVE_HEADERS, copy); - } - } - } - - private Map> toNativeHeaderMap(Map> map) { - return (map != null ? new LinkedMultiValueMap<>(map) : Collections.emptyMap()); - } - - @Override - public String toString() { - return "MessageHeaderPropagatorSetter{}"; - } - -} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java deleted file mode 100644 index b9f193cd6..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2022-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import io.micrometer.observation.ObservationRegistry; - -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -/** - * - * @author Oleg Zhurakousky - * - */ -@Configuration(proxyBeanMethods = false) -@ConditionalOnClass(ObservationRegistry.class) -public class ObservationAutoConfiguration { - - @Bean - @ConditionalOnMissingBean - public ObservationFunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry) { - return new ObservationFunctionAroundWrapper(registry); - } -} diff --git a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java deleted file mode 100644 index c2d9ba832..000000000 --- a/spring-cloud-function-observability/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright 2012-2019 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import io.micrometer.observation.Observation; -import io.micrometer.observation.ObservationRegistry; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.reactivestreams.Publisher; - -import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; -import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; -import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry; - - - -/** - * @author Marcin Grzejszczak - * @author Oleg Zhurakousky - * @since 4.0.0 - */ -public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper { - - private static final Log log = LogFactory.getLog(ObservationFunctionAroundWrapper.class); - - private final ObservationRegistry observationRegistry; - - private FunctionTagsProvider tagsProvider = new DefaultFunctionTagsProvider(); - - public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry) { - this.observationRegistry = observationRegistry; - } - - @Override - protected Object doApply(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) { - return targetFunction.apply(message); // no instrumentation - } - else if (targetFunction.isInputTypePublisher() || targetFunction.isOutputTypePublisher()) { - return reactorStream((Publisher) message, targetFunction); - } - return nonReactorStream(message, targetFunction); - } - - private Object reactorStream(Publisher message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - // TODO - return message; - } - - private Object nonReactorStream(Object message, - SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { - FunctionContext context = new FunctionContext(targetFunction).withInput(message); - Object invocationMessage = context.getModifiedInput(); - Object result = Observation - .createNotStarted(FunctionObservation.FUNCTION_OBSERVATION.getName(), () -> context, this.observationRegistry) - .contextualName(FunctionObservation.FUNCTION_OBSERVATION.getContextualName()) - .observe(() -> { - Object r = message == null ? targetFunction.get() : targetFunction.apply(invocationMessage); - context.setModifiedOutput(r); - return r; - }); - if (result == null) { - if (log.isDebugEnabled()) { - log.debug("Returned message is null - we have a consumer"); - } - return null; - } - return context.getModifiedOutput(); - } - - public void setKeyValuesProvider(FunctionTagsProvider functionTagsProvider) { - this.tagsProvider = functionTagsProvider; - } -} diff --git a/spring-cloud-function-observability/src/main/resources/META-INF/spring.factories b/spring-cloud-function-observability/src/main/resources/META-INF/spring.factories deleted file mode 100644 index e69de29bb..000000000 diff --git a/spring-cloud-function-observability/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/spring-cloud-function-observability/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports deleted file mode 100644 index c23cf1c76..000000000 --- a/spring-cloud-function-observability/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ /dev/null @@ -1 +0,0 @@ -org.springframework.cloud.function.observability.ObservationAutoConfiguration diff --git a/spring-cloud-function-observability/src/test/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapperTests.java b/spring-cloud-function-observability/src/test/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapperTests.java deleted file mode 100644 index b6832dff7..000000000 --- a/spring-cloud-function-observability/src/test/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapperTests.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright 2022-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.cloud.function.observability; - -import java.util.function.Function; - -import io.micrometer.observation.ObservationRegistry; -import io.micrometer.observation.tck.TestObservationRegistry; -import io.micrometer.observation.tck.TestObservationRegistryAssert; -import org.junit.jupiter.api.Test; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; - -/** - * - * @author Oleg Zhurakousky - * - */ -public class ObservationFunctionAroundWrapperTests { - - @Test - public void testSingleObservation() { - try (ConfigurableApplicationContext context = SpringApplication.run(SampleConfiguration.class, "")) { - FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Function, Message> uppercase = catalog.lookup("uppercase", "application/json"); - Message result = uppercase.apply(MessageBuilder.withPayload("\"marcin\"").build()); - System.out.println("Result: " + result); - - TestObservationRegistry registry = context.getBean(TestObservationRegistry.class); - TestObservationRegistryAssert.then(registry).hasSingleObservationThat() - .hasNameEqualTo("spring.cloud.function"); - } - } - - @Configuration - @EnableAutoConfiguration - public static class SampleConfiguration { - - @Bean - public ObservationRegistry testRegistry() { - return TestObservationRegistry.create(); - } - -// @Bean -// public ObservationFunctionAroundWrapper wrapper(ObservationRegistry registry) { -// return new ObservationFunctionAroundWrapper(registry); -// } - - @Bean - public Function uppercase() { - return v -> v.toUpperCase(); - } - } -}