diff --git a/spring-cloud-function-context/pom.xml b/spring-cloud-function-context/pom.xml
index 4e4fe2c30..946be7832 100644
--- a/spring-cloud-function-context/pom.xml
+++ b/spring-cloud-function-context/pom.xml
@@ -147,6 +147,11 @@
micrometer-observation-test
test
+
+ io.micrometer
+ micrometer-tracing-bridge-otel
+ test
+
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java
index 7031862ce..a31e8719b 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/DefaultFunctionObservationConvention.java
@@ -43,6 +43,6 @@ public class DefaultFunctionObservationConvention implements FunctionObservation
@Override
public String getContextualName(FunctionContext context) {
- return context.getTargetFunction().getFunctionDefinition();
+ return context.getTargetFunction().getFunctionDefinition() + " process";
}
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java
index a0d7f1c63..e5bdd89be 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/FunctionContext.java
@@ -19,19 +19,24 @@ package org.springframework.cloud.function.observability;
import io.micrometer.observation.Observation;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
+import org.springframework.messaging.Message;
/**
* {@link Observation.Context} for function processing.
*
* @author Marcin Grzejszczak
+ * @author Oleg Zhurakousky
* @since 4.0.0
*/
public class FunctionContext extends Observation.Context {
private final SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction;
- public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
+ private final Message> message;
+
+ public FunctionContext(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Message> message) {
this.targetFunction = targetFunction;
+ this.message = message;
}
public SimpleFunctionRegistry.FunctionInvocationWrapper getTargetFunction() {
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java
index 2e4e41d37..c8e1b4f8e 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java
@@ -36,7 +36,11 @@ public class ObservationAutoConfiguration {
@Bean
@ConditionalOnMissingBean
@ConditionalOnBean(ObservationRegistry.class)
- public FunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider functionReceiverObservationConvention, ObjectProvider functionObservationConvention, ObjectProvider functionSenderObservationConvention) {
- return new ObservationFunctionAroundWrapper(registry, functionReceiverObservationConvention.getIfAvailable(() -> null), functionObservationConvention.getIfAvailable(() -> null), functionSenderObservationConvention.getIfAvailable(() -> null));
+ public FunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry,
+ ObjectProvider functionReceiverObservationConvention,
+ ObjectProvider functionObservationConvention,
+ ObjectProvider functionSenderObservationConvention) {
+ return new ObservationFunctionAroundWrapper(registry, functionReceiverObservationConvention.getIfAvailable(() -> null),
+ functionObservationConvention.getIfAvailable(() -> null), functionSenderObservationConvention.getIfAvailable(() -> null));
}
}
diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java
index 8a79b4a05..0d6ce5675 100644
--- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java
+++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java
@@ -46,7 +46,8 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
private final FunctionSenderObservationConvention functionSenderObservationConvention;
- public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry, @Nullable FunctionReceiverObservationConvention functionReceiverObservationConvention, @Nullable FunctionObservationConvention functionObservationConvention, @Nullable FunctionSenderObservationConvention functionSenderObservationConvention) {
+ public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry, @Nullable FunctionReceiverObservationConvention functionReceiverObservationConvention,
+ @Nullable FunctionObservationConvention functionObservationConvention, @Nullable FunctionSenderObservationConvention functionSenderObservationConvention) {
this.observationRegistry = observationRegistry;
this.functionReceiverObservationConvention = functionReceiverObservationConvention;
this.functionObservationConvention = functionObservationConvention;
@@ -58,19 +59,19 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) {
return targetFunction.apply(message); // no instrumentation
}
- return nonReactorStream(message, targetFunction);
+ return nonReactorStream((Message>) message, targetFunction);
}
- private Object nonReactorStream(Object message,
+ private Object nonReactorStream(Message> message,
SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
if (targetFunction.isConsumer()) {
Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction);
- Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage);
+ Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage, message);
return consumerObservation.observe(() -> targetFunction.apply(message));
}
else if (targetFunction.isFunction()) {
Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction);
- Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage);
+ Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage, message);
Object outputMessage = consumerObservation.observe(() -> targetFunction.apply(message));
if (isNonNullMessageType(outputMessage)) {
return outputMessage; // no instrumentation
@@ -78,7 +79,7 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
return observeOutputMessage(outputMessage, targetFunction, consumerObservation);
}
else {
- Object supplierOutputMessage = functionProcessingObservation(targetFunction).observe(targetFunction::get);
+ Object supplierOutputMessage = functionProcessingObservation(targetFunction, message).observe(targetFunction::get);
if (isNonNullMessageType(supplierOutputMessage)) {
return supplierOutputMessage; // no instrumentation
}
@@ -86,12 +87,12 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
}
}
- private Observation functionProcessingObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
- return FunctionObservation.FUNCTION_PROCESSING_OBSERVATION.observation(this.functionObservationConvention, DefaultFunctionObservationConvention.INSTANCE, () -> new FunctionContext(targetFunction), this.observationRegistry);
+ private Observation functionProcessingObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Message> message) {
+ return FunctionObservation.FUNCTION_PROCESSING_OBSERVATION.observation(this.functionObservationConvention, DefaultFunctionObservationConvention.INSTANCE, () -> new FunctionContext(targetFunction, message), this.observationRegistry);
}
- private Observation consumerObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Observation observationOfInputMessage) {
- return functionProcessingObservation(targetFunction)
+ private Observation consumerObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Observation observationOfInputMessage, Message> message) {
+ return functionProcessingObservation(targetFunction, message)
.parentObservation(observationOfInputMessage);
}