diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java index a7908b5ee..16282a9e0 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java @@ -33,7 +33,6 @@ import org.springframework.cloud.function.json.JsonMapper; import org.springframework.http.HttpStatus; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.StreamUtils; diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PostProcessingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PostProcessingFunction.java new file mode 100644 index 000000000..e52c1b17f --- /dev/null +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/PostProcessingFunction.java @@ -0,0 +1,56 @@ +/* + * Copyright 2023-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.context; + +import java.util.function.Function; + +import org.springframework.messaging.Message; + +/** + * Strategy for implementing function with post processing behavior. + *
+ * The core framework only provides support for the post-processing behavior. + * The actual invocation of post-processing is left to the end user or the framework which + * integrates Spring Cloud Function. This is because post-processing can mean different things + * in different execution contexts. See {@link #postProcess(Message)} method for more information. + * + * @param - input type + * @param - output type + * + * @author Oleg Zhurakousky + * @since 4.0.3 + * + */ +public interface PostProcessingFunction extends Function { + + /** + * Will post process the result of this's function invocation after this function has been triggered. + *
+ * This operation is not managed/invoked by the core functionality of the Spring Cloud Function. + * It is specifically designed as a hook for other frameworks and extensions to invoke after + * this function was "triggered" and there is a requirement to do some post processing. The word "triggered" + * can mean different things in different execution contexts. For example, in spring-cloud-stream it means + * that the function has been invoked and the result of the function has been sent to the target destination. + * + * The boolean value argument - 'success' - allows the triggering framework to signal success or + * failure of its triggering operation whatever that may mean. + * + * @param result - the result of function invocation as an instance of {@link Message} including all the metadata as message headers. + */ + default void postProcess(Message result) { + } +} diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 7f48db799..15fc414c8 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -52,6 +52,7 @@ import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionProperties.FunctionConfigurationProperties; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; +import org.springframework.cloud.function.context.PostProcessingFunction; import org.springframework.cloud.function.context.config.RoutingFunction; import org.springframework.cloud.function.core.FunctionInvocationHelper; import org.springframework.cloud.function.json.JsonMapper; @@ -414,6 +415,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry { private boolean wrapped; + private final ThreadLocal> unconvertedResult = new ThreadLocal<>(); + + private PostProcessingFunction postProcessor; + /* * This is primarily to support Stream's ability to access * un-converted payload (e.g., to evaluate expression on some attribute of a payload) @@ -425,6 +430,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry { private boolean wrappedBiConsumer; FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) { + if (target instanceof PostProcessingFunction) { + this.postProcessor = (PostProcessingFunction) target; + } this.target = target; this.inputType = this.normalizeType(inputType); this.outputType = this.normalizeType(outputType); @@ -441,6 +449,25 @@ public class SimpleFunctionRegistry implements FunctionRegistry { } } + @SuppressWarnings("unchecked") + public void postProcess() { + if (this.postProcessor != null) { + Message result = this.unconvertedResult.get(); + if (result != null) { + try { + this.postProcessor.postProcess(result); + } + catch (Exception ex) { + logger.warn("Failed to post process function " + + this.functionDefinition + "; Result of the invocation before post processing is " + result, ex); + } + finally { + this.unconvertedResult.remove(); + } + } + } + } + public boolean isWrappedBiConsumer() { return wrappedBiConsumer; } @@ -652,6 +679,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry { String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition; FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType); composedFunction.composed = true; + if (((FunctionInvocationWrapper) after).target instanceof PostProcessingFunction) { + composedFunction.postProcessor = (PostProcessingFunction) ((FunctionInvocationWrapper) after).target; + } return (Function) composedFunction; } @@ -704,6 +734,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry { result = this.invokeFunction(convertedInput); } + if (this.postProcessor != null) { + this.unconvertedResult.set((Message) result); + } + if (result != null && this.outputType != null) { result = this.convertOutputIfNecessary(result, this.outputType, this.expectedOutputContentType); }