diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionAroundWrapper.java index 9e59c3eb6..814ad9201 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionAroundWrapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionAroundWrapper.java @@ -16,10 +16,9 @@ package org.springframework.cloud.function.context.catalog; -import java.util.function.BiFunction; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.messaging.Message; @@ -36,16 +35,15 @@ import org.springframework.util.StringUtils; * @author Oleg Zhurakousky * @since 3.1 */ -public abstract class FunctionAroundWrapper implements BiFunction { +public abstract class FunctionAroundWrapper { private static final Log log = LogFactory.getLog(FunctionAroundWrapper.class); - @Override public final Object apply(Object input, FunctionInvocationWrapper targetFunction) { String functionalTracingEnabledStr = System.getProperty("spring.sleuth.function.enabled"); boolean functionalTracingEnabled = StringUtils.hasText(functionalTracingEnabledStr) ? Boolean.parseBoolean(functionalTracingEnabledStr) : true; - if (functionalTracingEnabled) { + if (functionalTracingEnabled && !(input instanceof Publisher) && input instanceof Message) { boolean isSkipOutputConversion = targetFunction.isSkipOutputConversion(); targetFunction.setSkipOutputConversion(true); try { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 6b30c2325..a9b2d4029 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -442,6 +442,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry { this.skipOutputConversion = function.skipOutputConversion; this.skipInputConversion = function.skipInputConversion; this.target = function.target; + this.propagateInputHeaders = function.propagateInputHeaders; + this.composed = function.composed; this.inputType = function.inputType; this.outputType = function.outputType; this.functionDefinition = function.functionDefinition; @@ -499,6 +501,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry { return this.enhancer; } + public Type getOutputType() { + return this.outputType; + } + /** * !!! INTERNAL USE ONLY !!! * This is primarily to support s-c-Stream's ability to access @@ -514,10 +520,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry { return target; } - public Type getOutputType() { - return this.outputType; - } - public Type getInputType() { return this.inputType; } @@ -1139,7 +1141,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry { if (this.skipOutputConversion) { return output; } - if (functionAroundWrapper == null && output instanceof Message && isExtractPayload((Message) output, type)) { + if (/*functionAroundWrapper == null && */ output instanceof Message && isExtractPayload((Message) output, type)) { output = ((Message) output).getPayload(); } if (!(output instanceof Publisher) && this.enhancer != null) { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java index a57f441ba..2e4e41d37 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationAutoConfiguration.java @@ -19,8 +19,10 @@ package org.springframework.cloud.function.observability; import io.micrometer.observation.ObservationRegistry; import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.function.context.catalog.FunctionAroundWrapper; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,7 +35,8 @@ public class ObservationAutoConfiguration { @Bean @ConditionalOnMissingBean - public ObservationFunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider functionReceiverObservationConvention, ObjectProvider functionObservationConvention, ObjectProvider functionSenderObservationConvention) { + @ConditionalOnBean(ObservationRegistry.class) + public FunctionAroundWrapper observationFunctionAroundWrapper(ObservationRegistry registry, ObjectProvider functionReceiverObservationConvention, ObjectProvider functionObservationConvention, ObjectProvider functionSenderObservationConvention) { return new ObservationFunctionAroundWrapper(registry, functionReceiverObservationConvention.getIfAvailable(() -> null), functionObservationConvention.getIfAvailable(() -> null), functionSenderObservationConvention.getIfAvailable(() -> null)); } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java index ba18ac65c..8a79b4a05 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/observability/ObservationFunctionAroundWrapper.java @@ -61,7 +61,6 @@ public class ObservationFunctionAroundWrapper extends FunctionAroundWrapper { return nonReactorStream(message, targetFunction); } - @SuppressWarnings("unchecked") private Object nonReactorStream(Object message, SimpleFunctionRegistry.FunctionInvocationWrapper targetFunction) { if (targetFunction.isConsumer()) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index f75304055..ebeb9dd94 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -53,6 +53,7 @@ import reactor.util.function.Tuple3; import reactor.util.function.Tuples; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.FunctionRegistration; @@ -1020,6 +1021,7 @@ public class BeanFactoryAwareFunctionRegistryTests { } @Bean + @ConditionalOnMissingBean public FunctionAroundWrapper wrapper() { return new FunctionAroundWrapper() {