|
|
|
|
@@ -46,7 +46,8 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
|
|
|
|
|
|
|
|
|
|
private final FunctionSenderObservationConvention functionSenderObservationConvention;
|
|
|
|
|
|
|
|
|
|
public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry, @Nullable FunctionReceiverObservationConvention functionReceiverObservationConvention, @Nullable FunctionObservationConvention functionObservationConvention, @Nullable FunctionSenderObservationConvention functionSenderObservationConvention) {
|
|
|
|
|
public ObservationFunctionAroundWrapper(ObservationRegistry observationRegistry, @Nullable FunctionReceiverObservationConvention functionReceiverObservationConvention,
|
|
|
|
|
@Nullable FunctionObservationConvention functionObservationConvention, @Nullable FunctionSenderObservationConvention functionSenderObservationConvention) {
|
|
|
|
|
this.observationRegistry = observationRegistry;
|
|
|
|
|
this.functionReceiverObservationConvention = functionReceiverObservationConvention;
|
|
|
|
|
this.functionObservationConvention = functionObservationConvention;
|
|
|
|
|
@@ -58,19 +59,19 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
|
|
|
|
|
if (FunctionTypeUtils.isCollectionOfMessage(targetFunction.getOutputType())) {
|
|
|
|
|
return targetFunction.apply(message); // no instrumentation
|
|
|
|
|
}
|
|
|
|
|
return nonReactorStream(message, targetFunction);
|
|
|
|
|
return nonReactorStream((Message<?>) message, targetFunction);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Object nonReactorStream(Object message,
|
|
|
|
|
private Object nonReactorStream(Message<?> message,
|
|
|
|
|
SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
|
|
|
|
|
if (targetFunction.isConsumer()) {
|
|
|
|
|
Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction);
|
|
|
|
|
Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage);
|
|
|
|
|
Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage, message);
|
|
|
|
|
return consumerObservation.observe(() -> targetFunction.apply(message));
|
|
|
|
|
}
|
|
|
|
|
else if (targetFunction.isFunction()) {
|
|
|
|
|
Observation observationOfInputMessage = stoppedObservationOfInputMessage(message, targetFunction);
|
|
|
|
|
Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage);
|
|
|
|
|
Observation consumerObservation = consumerObservation(targetFunction, observationOfInputMessage, message);
|
|
|
|
|
Object outputMessage = consumerObservation.observe(() -> targetFunction.apply(message));
|
|
|
|
|
if (isNonNullMessageType(outputMessage)) {
|
|
|
|
|
return outputMessage; // no instrumentation
|
|
|
|
|
@@ -78,7 +79,7 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
|
|
|
|
|
return observeOutputMessage(outputMessage, targetFunction, consumerObservation);
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
Object supplierOutputMessage = functionProcessingObservation(targetFunction).observe(targetFunction::get);
|
|
|
|
|
Object supplierOutputMessage = functionProcessingObservation(targetFunction, message).observe(targetFunction::get);
|
|
|
|
|
if (isNonNullMessageType(supplierOutputMessage)) {
|
|
|
|
|
return supplierOutputMessage; // no instrumentation
|
|
|
|
|
}
|
|
|
|
|
@@ -86,12 +87,12 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Observation functionProcessingObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
|
|
|
|
|
return FunctionObservation.FUNCTION_PROCESSING_OBSERVATION.observation(this.functionObservationConvention, DefaultFunctionObservationConvention.INSTANCE, () -> new FunctionContext(targetFunction), this.observationRegistry);
|
|
|
|
|
private Observation functionProcessingObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Message<?> message) {
|
|
|
|
|
return FunctionObservation.FUNCTION_PROCESSING_OBSERVATION.observation(this.functionObservationConvention, DefaultFunctionObservationConvention.INSTANCE, () -> new FunctionContext(targetFunction, message), this.observationRegistry);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Observation consumerObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Observation observationOfInputMessage) {
|
|
|
|
|
return functionProcessingObservation(targetFunction)
|
|
|
|
|
private Observation consumerObservation(SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction, Observation observationOfInputMessage, Message<?> message) {
|
|
|
|
|
return functionProcessingObservation(targetFunction, message)
|
|
|
|
|
.parentObservation(observationOfInputMessage);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|