Polishing previous observability comit

This commit is contained in:
Oleg Zhurakousky
2022-10-20 11:12:58 +02:00
parent d4917dc8a7
commit 857fa022f4
5 changed files with 16 additions and 12 deletions

View File

@@ -16,10 +16,9 @@
package org.springframework.cloud.function.context.catalog;
import java.util.function.BiFunction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.messaging.Message;
@@ -36,16 +35,15 @@ import org.springframework.util.StringUtils;
* @author Oleg Zhurakousky
* @since 3.1
*/
public abstract class FunctionAroundWrapper implements BiFunction<Object, FunctionInvocationWrapper, Object> {
public abstract class FunctionAroundWrapper {
private static final Log log = LogFactory.getLog(FunctionAroundWrapper.class);
@Override
public final Object apply(Object input, FunctionInvocationWrapper targetFunction) {
String functionalTracingEnabledStr = System.getProperty("spring.sleuth.function.enabled");
boolean functionalTracingEnabled = StringUtils.hasText(functionalTracingEnabledStr)
? Boolean.parseBoolean(functionalTracingEnabledStr) : true;
if (functionalTracingEnabled) {
if (functionalTracingEnabled && !(input instanceof Publisher) && input instanceof Message) {
boolean isSkipOutputConversion = targetFunction.isSkipOutputConversion();
targetFunction.setSkipOutputConversion(true);
try {

View File

@@ -442,6 +442,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
this.skipOutputConversion = function.skipOutputConversion;
this.skipInputConversion = function.skipInputConversion;
this.target = function.target;
this.propagateInputHeaders = function.propagateInputHeaders;
this.composed = function.composed;
this.inputType = function.inputType;
this.outputType = function.outputType;
this.functionDefinition = function.functionDefinition;
@@ -499,6 +501,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
return this.enhancer;
}
public Type getOutputType() {
return this.outputType;
}
/**
* !!! INTERNAL USE ONLY !!!
* This is primarily to support s-c-Stream's ability to access
@@ -514,10 +520,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
return target;
}
public Type getOutputType() {
return this.outputType;
}
public Type getInputType() {
return this.inputType;
}
@@ -1139,7 +1141,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
if (this.skipOutputConversion) {
return output;
}
if (functionAroundWrapper == null && output instanceof Message && isExtractPayload((Message<?>) output, type)) {
if (/*functionAroundWrapper == null && */ output instanceof Message && isExtractPayload((Message<?>) output, type)) {
output = ((Message) output).getPayload();
}
if (!(output instanceof Publisher) && this.enhancer != null) {

View File

@@ -19,8 +19,10 @@ package org.springframework.cloud.function.observability;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -33,7 +35,8 @@ public class ObservationAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public ObservationFunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider<FunctionReceiverObservationConvention> functionReceiverObservationConvention, ObjectProvider<FunctionObservationConvention> functionObservationConvention, ObjectProvider<FunctionSenderObservationConvention> functionSenderObservationConvention) {
@ConditionalOnBean(ObservationRegistry.class)
public FunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider<FunctionReceiverObservationConvention> functionReceiverObservationConvention, ObjectProvider<FunctionObservationConvention> functionObservationConvention, ObjectProvider<FunctionSenderObservationConvention> functionSenderObservationConvention) {
return new ObservationFunctionAroundWrapper(registry, functionReceiverObservationConvention.getIfAvailable(() -> null), functionObservationConvention.getIfAvailable(() -> null), functionSenderObservationConvention.getIfAvailable(() -> null));
}
}

View File

@@ -61,7 +61,6 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper {
return nonReactorStream(message, targetFunction);
}
@SuppressWarnings("unchecked")
private Object nonReactorStream(Object message,
SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) {
if (targetFunction.isConsumer()) {

View File

@@ -53,6 +53,7 @@ import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.FunctionRegistration;
@@ -1020,6 +1021,7 @@ public class BeanFactoryAwareFunctionRegistryTests {
}
@Bean
@ConditionalOnMissingBean
public FunctionAroundWrapper wrapper() {
return new FunctionAroundWrapper() {