diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java index 59b3a64b5..2ccd3518c 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -33,9 +33,21 @@ public class FunctionProperties { public final static String PREFIX = "spring.cloud.function"; /** - * Name of the header to be used to instruct function catalog to skip type conversion. + * Name of the header to be used to instruct function catalog to skip input type conversion. + * @deprecated since 3.1. Use #SKIP_INPUT_CONVERSION_HEADER */ - public final static String SKIP_CONVERSION_HEADER = "skip-type-conversion"; + @Deprecated + public final static String SKIP_CONVERSION_HEADER = "skip-input-type-conversion"; + + /** + * Name of the header to be used to instruct function catalog to skip input type conversion. + */ + public final static String SKIP_INPUT_CONVERSION_HEADER = "skip-input-type-conversion"; + + /** + * Name of the header to be used to instruct function catalog to skip output type conversion. + */ + public final static String SKIP_OUTPUT_CONVERSION_HEADER = "skip-output-type-conversion"; /** * Name of the header to be used to instruct function to apply this content type for output conversion. diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java index bed482773..af8027a38 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/FunctionTypeUtils.java @@ -291,6 +291,13 @@ public final class FunctionTypeUtils { return TypeResolver.resolveRawClass(type, null) == Flux.class; } + public static boolean isCollectionOfMessage(Type type) { + if (isMessage(type)) { + return isTypeCollection(type); + } + return false; + } + public static boolean isMessage(Type type) { if (isPublisher(type)) { type = getImmediateGenericType(type, 0); diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index cd5675b72..9088cb77f 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -504,22 +504,32 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object enrichInvocationResultIfNecessary(Object input, Object result) { - // TODO we need to investigate this further. This effectively states that if `scf-func-name` present - // wrap the result in a message regardless and copy all the headers from the incoming message. - // Used in SupplierExporter - if (input instanceof Message && ((Message) input).getHeaders().containsKey("scf-func-name")) { + if (result != null && !(result instanceof Publisher) && input instanceof Message) { if (result instanceof Message) { Map headersMap = (Map) ReflectionUtils .getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders()); - headersMap.putAll(((Message) input).getHeaders()); + headersMap.putAll(this.sanitizeHeaders(((Message) input).getHeaders())); } else { - result = MessageBuilder.withPayload(result).copyHeaders(((Message) input).getHeaders()).build(); + result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); } } return result; } + /* + * Will ensure no headers with null values are copied. + */ + private Map sanitizeHeaders(MessageHeaders headers) { + Map sanitizedHeaders = new HashMap<>(); + headers.forEach((k, v) -> { + if (v != null) { + sanitizedHeaders.put(k, v); + } + }); + return sanitizedHeaders; + } + /* * */ @@ -649,6 +659,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect + this.functionDefinition + "' since it's input type is Void and as such it is treated as Supplier."); input = null; } + if (this.isSkipConversionHeaderSet(input, true)) { + return input; + } if (FunctionTypeUtils.isMultipleArgumentType(type)) { Type[] inputTypes = ((ParameterizedType) type).getActualTypeArguments(); @@ -690,6 +703,18 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect * set as a header in a message or explicitly provided as part of the lookup. */ private Object convertOutputIfNecessary(Object output, Type type, String[] contentType) { + if (this.isSkipConversionHeaderSet(output, false)) { + return output; + } + if (output instanceof Message && !this.containsRetainMessageSignalInHeaders((Message) output)) { + if (!FunctionTypeUtils.isMessage(type)) { + output = ((Message) output).getPayload(); + } + else if (FunctionTypeUtils.isMessage(type) && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(type))) { + output = ((Message) output).getPayload(); + } + } + if (!(output instanceof Publisher) && this.enhancer != null) { output = enhancer.apply(output); } @@ -714,6 +739,44 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect return convertedOutput; } + /* + * This header may be set by a framework that uses s-c-function but does not want to rely on type + * conversion mechanism provided by s-c-function + */ + private boolean isSkipConversionHeaderSet(Object value, boolean input) { + if (value instanceof Message) { + Message message = (Message) value; + String headerName = input ? FunctionProperties.SKIP_INPUT_CONVERSION_HEADER : FunctionProperties.SKIP_OUTPUT_CONVERSION_HEADER; + if (message.getHeaders().containsKey(headerName)) { + Object skipValue = message.getHeaders().get(headerName); + boolean skip = skipValue instanceof Boolean ? (boolean) skipValue : Boolean.parseBoolean((String) skipValue); + return skip; + } + } + return false; + } + + /** + * Will check if message contains any of the headers that are considered to serve as + * signals to retain output as Message (regardless of the output type of function). + * At this moment presence of 'scf-func-name' header or any header that begins with `lambda' + * (use by AWS) will result in this method returning true. + */ + /* + * TODO we need to investigate if this could be extracted into some type of strategy since at + * the pure core level there is no case for this to ever be true. In fact today it is only AWS Lambda + * case that requires it since it may contain forwarding url + */ + private boolean containsRetainMessageSignalInHeaders(Message message) { + for (String headerName : message.getHeaders().keySet()) { + if (headerName.startsWith("lambda") || + headerName.startsWith("scf-func-name")) { + return true; + } + } + return false; + } + /* * */ @@ -883,11 +946,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) { - Type actualType = type != null ? FunctionTypeUtils.getGenericType(type) : type; + //Type actualType = type;// != null ? FunctionTypeUtils.getGenericType(type) : type; return publisher instanceof Mono - ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, actualType, expectedOutputContentType)) + ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)) - : Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, actualType, expectedOutputContentType)) + : Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)); } } diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java index a2f75e1df..109701de8 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistryTests.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.context.catalog; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -97,10 +98,8 @@ public class SimpleFunctionRegistryTests { catalog.register(registration); FunctionInvocationWrapper lookedUpFunction = catalog.lookup("uppercase"); - Message message = MessageBuilder.withPayload("hello") - .setHeader("scf-sink-url", "blah") - .setHeader("scf-func-name", "blah") + .setHeader("lambda-runtime-aws-request-id", UUID.randomUUID()) .build(); Object result = lookedUpFunction.apply(message); assertThat(result).isInstanceOf(Message.class);