GH-1023 Add support for post function processing

This commit is contained in:
Oleg Zhurakousky
2023-04-13 13:49:56 +02:00
parent 4ba5ea3452
commit 278d917543
3 changed files with 90 additions and 1 deletions

View File

@@ -0,0 +1,56 @@
/*
* Copyright 2023-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context;
import java.util.function.Function;
import org.springframework.messaging.Message;
/**
* Strategy for implementing function with post processing behavior.
* <br>
* The core framework only provides support for the post-processing behavior.
* The actual invocation of post-processing is left to the end user or the framework which
* integrates Spring Cloud Function. This is because post-processing can mean different things
* in different execution contexts. See {@link #postProcess(Message)} method for more information.
*
* @param <I> - input type
* @param <O> - output type
*
* @author Oleg Zhurakousky
* @since 4.0.3
*
*/
public interface PostProcessingFunction<I, O> extends Function<I, O> {
/**
* Will post process the result of this's function invocation after this function has been triggered.
* <br>
* This operation is not managed/invoked by the core functionality of the Spring Cloud Function.
* It is specifically designed as a hook for other frameworks and extensions to invoke after
* this function was "triggered" and there is a requirement to do some post processing. The word "triggered"
* can mean different things in different execution contexts. For example, in spring-cloud-stream it means
* that the function has been invoked and the result of the function has been sent to the target destination.
*
* The boolean value argument - 'success' - allows the triggering framework to signal success or
* failure of its triggering operation whatever that may mean.
*
* @param result - the result of function invocation as an instance of {@link Message} including all the metadata as message headers.
*/
default void postProcess(Message<O> result) {
}
}

View File

@@ -52,6 +52,7 @@ import org.springframework.cloud.function.context.FunctionProperties;
import org.springframework.cloud.function.context.FunctionProperties.FunctionConfigurationProperties;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.PostProcessingFunction;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.core.FunctionInvocationHelper;
import org.springframework.cloud.function.json.JsonMapper;
@@ -414,6 +415,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
private boolean wrapped;
private final ThreadLocal<Message<Object>> unconvertedResult = new ThreadLocal<>();
private PostProcessingFunction postProcessor;
/*
* This is primarily to support Stream's ability to access
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
@@ -425,6 +430,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
private boolean wrappedBiConsumer;
FunctionInvocationWrapper(String functionDefinition, Object target, Type inputType, Type outputType) {
if (target instanceof PostProcessingFunction) {
this.postProcessor = (PostProcessingFunction) target;
}
this.target = target;
this.inputType = this.normalizeType(inputType);
this.outputType = this.normalizeType(outputType);
@@ -441,6 +449,25 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
}
}
@SuppressWarnings("unchecked")
public void postProcess() {
if (this.postProcessor != null) {
Message result = this.unconvertedResult.get();
if (result != null) {
try {
this.postProcessor.postProcess(result);
}
catch (Exception ex) {
logger.warn("Failed to post process function "
+ this.functionDefinition + "; Result of the invocation before post processing is " + result, ex);
}
finally {
this.unconvertedResult.remove();
}
}
}
}
public boolean isWrappedBiConsumer() {
return wrappedBiConsumer;
}
@@ -652,6 +679,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
String composedName = this.functionDefinition + "|" + afterWrapper.functionDefinition;
FunctionInvocationWrapper composedFunction = invocationWrapperInstance(composedName, rawComposedFunction, composedFunctionType);
composedFunction.composed = true;
if (((FunctionInvocationWrapper) after).target instanceof PostProcessingFunction) {
composedFunction.postProcessor = (PostProcessingFunction) ((FunctionInvocationWrapper) after).target;
}
return (Function<Object, V>) composedFunction;
}
@@ -704,6 +734,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
result = this.invokeFunction(convertedInput);
}
if (this.postProcessor != null) {
this.unconvertedResult.set((Message<Object>) result);
}
if (result != null && this.outputType != null) {
result = this.convertOutputIfNecessary(result, this.outputType, this.expectedOutputContentType);
}