Consolidate mechanisms around retaining headers and skipping input/output conversion
This commit is contained in:
@@ -33,9 +33,21 @@ public class FunctionProperties {
|
||||
public final static String PREFIX = "spring.cloud.function";
|
||||
|
||||
/**
|
||||
* Name of the header to be used to instruct function catalog to skip type conversion.
|
||||
* Name of the header to be used to instruct function catalog to skip input type conversion.
|
||||
* @deprecated since 3.1. Use #SKIP_INPUT_CONVERSION_HEADER
|
||||
*/
|
||||
public final static String SKIP_CONVERSION_HEADER = "skip-type-conversion";
|
||||
@Deprecated
|
||||
public final static String SKIP_CONVERSION_HEADER = "skip-input-type-conversion";
|
||||
|
||||
/**
|
||||
* Name of the header to be used to instruct function catalog to skip input type conversion.
|
||||
*/
|
||||
public final static String SKIP_INPUT_CONVERSION_HEADER = "skip-input-type-conversion";
|
||||
|
||||
/**
|
||||
* Name of the header to be used to instruct function catalog to skip output type conversion.
|
||||
*/
|
||||
public final static String SKIP_OUTPUT_CONVERSION_HEADER = "skip-output-type-conversion";
|
||||
|
||||
/**
|
||||
* Name of the header to be used to instruct function to apply this content type for output conversion.
|
||||
|
||||
@@ -291,6 +291,13 @@ public final class FunctionTypeUtils {
|
||||
return TypeResolver.resolveRawClass(type, null) == Flux.class;
|
||||
}
|
||||
|
||||
public static boolean isCollectionOfMessage(Type type) {
|
||||
if (isMessage(type)) {
|
||||
return isTypeCollection(type);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean isMessage(Type type) {
|
||||
if (isPublisher(type)) {
|
||||
type = getImmediateGenericType(type, 0);
|
||||
|
||||
@@ -504,22 +504,32 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object enrichInvocationResultIfNecessary(Object input, Object result) {
|
||||
// TODO we need to investigate this further. This effectively states that if `scf-func-name` present
|
||||
// wrap the result in a message regardless and copy all the headers from the incoming message.
|
||||
// Used in SupplierExporter
|
||||
if (input instanceof Message && ((Message) input).getHeaders().containsKey("scf-func-name")) {
|
||||
if (result != null && !(result instanceof Publisher) && input instanceof Message) {
|
||||
if (result instanceof Message) {
|
||||
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
|
||||
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
|
||||
headersMap.putAll(((Message) input).getHeaders());
|
||||
headersMap.putAll(this.sanitizeHeaders(((Message) input).getHeaders()));
|
||||
}
|
||||
else {
|
||||
result = MessageBuilder.withPayload(result).copyHeaders(((Message) input).getHeaders()).build();
|
||||
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build();
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Will ensure no headers with null values are copied.
|
||||
*/
|
||||
private Map<String, Object> sanitizeHeaders(MessageHeaders headers) {
|
||||
Map<String, Object> sanitizedHeaders = new HashMap<>();
|
||||
headers.forEach((k, v) -> {
|
||||
if (v != null) {
|
||||
sanitizedHeaders.put(k, v);
|
||||
}
|
||||
});
|
||||
return sanitizedHeaders;
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
@@ -649,6 +659,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
+ this.functionDefinition + "' since it's input type is Void and as such it is treated as Supplier.");
|
||||
input = null;
|
||||
}
|
||||
if (this.isSkipConversionHeaderSet(input, true)) {
|
||||
return input;
|
||||
}
|
||||
|
||||
if (FunctionTypeUtils.isMultipleArgumentType(type)) {
|
||||
Type[] inputTypes = ((ParameterizedType) type).getActualTypeArguments();
|
||||
@@ -690,6 +703,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
* set as a header in a message or explicitly provided as part of the lookup.
|
||||
*/
|
||||
private Object convertOutputIfNecessary(Object output, Type type, String[] contentType) {
|
||||
if (this.isSkipConversionHeaderSet(output, false)) {
|
||||
return output;
|
||||
}
|
||||
if (output instanceof Message && !this.containsRetainMessageSignalInHeaders((Message) output)) {
|
||||
if (!FunctionTypeUtils.isMessage(type)) {
|
||||
output = ((Message) output).getPayload();
|
||||
}
|
||||
else if (FunctionTypeUtils.isMessage(type) && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type))) {
|
||||
output = ((Message) output).getPayload();
|
||||
}
|
||||
}
|
||||
|
||||
if (!(output instanceof Publisher) && this.enhancer != null) {
|
||||
output = enhancer.apply(output);
|
||||
}
|
||||
@@ -714,6 +739,44 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
return convertedOutput;
|
||||
}
|
||||
|
||||
/*
|
||||
* This header may be set by a framework that uses s-c-function but does not want to rely on type
|
||||
* conversion mechanism provided by s-c-function
|
||||
*/
|
||||
private boolean isSkipConversionHeaderSet(Object value, boolean input) {
|
||||
if (value instanceof Message) {
|
||||
Message message = (Message) value;
|
||||
String headerName = input ? FunctionProperties.SKIP_INPUT_CONVERSION_HEADER : FunctionProperties.SKIP_OUTPUT_CONVERSION_HEADER;
|
||||
if (message.getHeaders().containsKey(headerName)) {
|
||||
Object skipValue = message.getHeaders().get(headerName);
|
||||
boolean skip = skipValue instanceof Boolean ? (boolean) skipValue : Boolean.parseBoolean((String) skipValue);
|
||||
return skip;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Will check if message contains any of the headers that are considered to serve as
|
||||
* signals to retain output as Message (regardless of the output type of function).
|
||||
* At this moment presence of 'scf-func-name' header or any header that begins with `lambda'
|
||||
* (use by AWS) will result in this method returning true.
|
||||
*/
|
||||
/*
|
||||
* TODO we need to investigate if this could be extracted into some type of strategy since at
|
||||
* the pure core level there is no case for this to ever be true. In fact today it is only AWS Lambda
|
||||
* case that requires it since it may contain forwarding url
|
||||
*/
|
||||
private boolean containsRetainMessageSignalInHeaders(Message message) {
|
||||
for (String headerName : message.getHeaders().keySet()) {
|
||||
if (headerName.startsWith("lambda") ||
|
||||
headerName.startsWith("scf-func-name")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
*
|
||||
*/
|
||||
@@ -883,11 +946,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) {
|
||||
Type actualType = type != null ? FunctionTypeUtils.getGenericType(type) : type;
|
||||
//Type actualType = type;// != null ? FunctionTypeUtils.getGenericType(type) : type;
|
||||
return publisher instanceof Mono
|
||||
? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, actualType, expectedOutputContentType))
|
||||
? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
||||
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex))
|
||||
: Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, actualType, expectedOutputContentType))
|
||||
: Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
||||
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user