From 3bfa5faa441358401f13f2d347e3f23f488fcb42 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 4 May 2022 13:20:01 +0200 Subject: [PATCH] GH-791 Add support for propagating input headers Resolves #791 polishing merge --- .../main/asciidoc/spring-cloud-function.adoc | 35 +++++++++++++++++++ .../function/context/FunctionProperties.java | 10 ++++++ .../catalog/SimpleFunctionRegistry.java | 14 ++++++++ .../SmartCompositeMessageConverter.java | 11 ++++-- ...BeanFactoryAwareFunctionRegistryTests.java | 34 ++++++++++++++++++ 5 files changed, 101 insertions(+), 3 deletions(-) diff --git a/docs/src/main/asciidoc/spring-cloud-function.adoc b/docs/src/main/asciidoc/spring-cloud-function.adoc index f53e497aa..e201ed0d7 100644 --- a/docs/src/main/asciidoc/spring-cloud-function.adoc +++ b/docs/src/main/asciidoc/spring-cloud-function.adoc @@ -428,6 +428,41 @@ IMPORTANT: IMPORTANT: At the moment, function arity is *only* supported for reac where evaluation and computation on confluence of events typically requires view into a stream of events rather than single event. +=== Input Header propagation + +In a typical scenario input Message headers are not propagated to output and rightfully so, since the output of a function may be an input to something else requiring it's own set of Message headers. +However, there are times when such propagation may be necessary so Spring Cloud Function provides several mechanisms to accomplish this. + +First you can always copy headers manually. For example, if you have a Function with the signature that takes `Message` and returns `Message` (i.e., `Function`), you can simply and selectively copy headers yourselves. Remember, if your function returns Message, the framework will not do anything to it other then properly converting its payload. +However, such approach may prove to be a bit tedious, especially in cases when you simply want to copy all headers. +To assist with cases like this we provide a simple property that would allow you to set a boolean flag on a function where you want input headers to be propagated. +The property is `copy-input-headers`. + +For example, let's assume you have the following configuration: + +[source, java] +---- +@EnableAutoConfiguration +@Configuration +protected static class InputHeaderPropagationConfiguration { + + @Bean + public Function uppercase() { + return x -> x.toUpperCase(); + } +} +---- + +As you know you can still invoke this function by sending a Message to it (framework will take care of type conversion and payload extraction) + +By simply setting `spring.cloud.function.configuration.uppercase.copy-input-headers` to `true`, the following assertion will be true as well + +---- +Function, Message> uppercase = catalog.lookup("uppercase", "application/json"); +Message result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); +assertThat(result.getHeaders()).containsKey("foo"); +---- + === Type conversion (Content-Type negotiation) Content-Type negotiation is one of the core features of Spring Cloud Function as it allows to not only transform the incoming data to the types declared diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java index 12f49a1e8..74702bbd6 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/FunctionProperties.java @@ -170,6 +170,8 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA private Map outputHeaderMappingExpression; + private boolean copyInputHeaders; + public Map getInputHeaderMappingExpression() { return inputHeaderMappingExpression; } @@ -187,5 +189,13 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA this.outputHeaderMappingExpression = outputHeaderMappingExpression; } + public boolean isCopyInputHeaders() { + return copyInputHeaders; + } + + public void setCopyInputHeaders(boolean copyInputHeaders) { + this.copyInputHeaders = copyInputHeaders; + } + } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index a7712936e..d5919e143 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -401,6 +401,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry { private boolean isSingleton = true; + private boolean propagateInputHeaders; + /* * This is primarily to support Stream's ability to access * un-converted payload (e.g., to evaluate expression on some attribute of a payload) @@ -426,6 +428,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry { this.outputType = this.normalizeType(outputType); this.functionDefinition = functionDefinition; this.message = this.inputType != null && FunctionTypeUtils.isMessage(this.inputType); + if (functionProperties != null) { + Map funcConfiguration = functionProperties.getConfiguration(); + if (!CollectionUtils.isEmpty(funcConfiguration)) { + FunctionConfigurationProperties configuration = funcConfiguration.get(functionDefinition); + if (configuration != null) { + propagateInputHeaders = configuration.isCopyInputHeaders(); + } + } + } } public boolean isSkipOutputConversion() { @@ -1087,6 +1098,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry { } private boolean isExtractPayload(Message message, Type type) { + if (this.propagateInputHeaders) { + return false; + } if (this.isRoutingFunction()) { return false; } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java index 5b62325f1..7e49c9bf5 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/SmartCompositeMessageConverter.java @@ -48,9 +48,14 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter { if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection)) { return message.getPayload(); } - Object result = converter.fromMessage(message, targetClass); - if (result != null) { - return result; + try { + Object result = converter.fromMessage(message, targetClass); + if (result != null) { + return result; + } + } + catch (Exception e) { + // ignore } } return null; diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index ca4540ee8..09c461fa8 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -57,6 +57,7 @@ import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.ApplicationContext; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.lang.Nullable; @@ -677,6 +678,29 @@ public class BeanFactoryAwareFunctionRegistryTests { assertThat(result).startsWith("{date="); } + @Test + public void test_791() { + try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true")) { + FunctionCatalog catalog = ac.getBean(FunctionCatalog.class); + + Function, Message> uppercase = catalog.lookup("uppercase", "application/json"); + Message result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); + assertThat(result.getHeaders()).doesNotContainKey("foo"); + } + try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class) + .run("--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true", + "--spring.cloud.function.configuration.uppercase.copy-input-headers=true")) { + FunctionCatalog catalog = ac.getBean(FunctionCatalog.class); + + Function, Message> uppercase = catalog.lookup("uppercase", "application/json"); + Message result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build()); + assertThat(result.getHeaders()).containsKey("foo"); + } + } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void testArrayPayloadOnFluxFunction() throws Exception { @@ -928,6 +952,16 @@ public class BeanFactoryAwareFunctionRegistryTests { } } + @EnableAutoConfiguration + @Configuration + protected static class InputHeaderPropagationConfiguration { + + @Bean + public Function uppercase() { + return x -> x.toUpperCase(); + } + } + @EnableAutoConfiguration @Configuration protected static class SampleFunctionConfiguration {