@@ -428,6 +428,41 @@ IMPORTANT: IMPORTANT: At the moment, function arity is *only* supported for reac
|
||||
where evaluation and computation on confluence of events typically requires view into a
|
||||
stream of events rather than single event.
|
||||
|
||||
=== Input Header propagation
|
||||
|
||||
In a typical scenario input Message headers are not propagated to output and rightfully so, since the output of a function may be an input to something else requiring it's own set of Message headers.
|
||||
However, there are times when such propagation may be necessary so Spring Cloud Function provides several mechanisms to accomplish this.
|
||||
|
||||
First you can always copy headers manually. For example, if you have a Function with the signature that takes `Message` and returns `Message` (i.e., `Function<Message, Message>`), you can simply and selectively copy headers yourselves. Remember, if your function returns Message, the framework will not do anything to it other then properly converting its payload.
|
||||
However, such approach may prove to be a bit tedious, especially in cases when you simply want to copy all headers.
|
||||
To assist with cases like this we provide a simple property that would allow you to set a boolean flag on a function where you want input headers to be propagated.
|
||||
The property is `copy-input-headers`.
|
||||
|
||||
For example, let's assume you have the following configuration:
|
||||
|
||||
[source, java]
|
||||
----
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class InputHeaderPropagationConfiguration {
|
||||
|
||||
@Bean
|
||||
public Function<String, String> uppercase() {
|
||||
return x -> x.toUpperCase();
|
||||
}
|
||||
}
|
||||
----
|
||||
|
||||
As you know you can still invoke this function by sending a Message to it (framework will take care of type conversion and payload extraction)
|
||||
|
||||
By simply setting `spring.cloud.function.configuration.uppercase.copy-input-headers` to `true`, the following assertion will be true as well
|
||||
|
||||
----
|
||||
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
|
||||
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
|
||||
assertThat(result.getHeaders()).containsKey("foo");
|
||||
----
|
||||
|
||||
=== Type conversion (Content-Type negotiation)
|
||||
|
||||
Content-Type negotiation is one of the core features of Spring Cloud Function as it allows to not only transform the incoming data to the types declared
|
||||
|
||||
@@ -170,6 +170,8 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
|
||||
private Map<String, Object> outputHeaderMappingExpression;
|
||||
|
||||
private boolean copyInputHeaders;
|
||||
|
||||
public Map<String, Object> getInputHeaderMappingExpression() {
|
||||
return inputHeaderMappingExpression;
|
||||
}
|
||||
@@ -187,5 +189,13 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
this.outputHeaderMappingExpression = outputHeaderMappingExpression;
|
||||
}
|
||||
|
||||
public boolean isCopyInputHeaders() {
|
||||
return copyInputHeaders;
|
||||
}
|
||||
|
||||
public void setCopyInputHeaders(boolean copyInputHeaders) {
|
||||
this.copyInputHeaders = copyInputHeaders;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -408,6 +408,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
|
||||
private boolean isSingleton = true;
|
||||
|
||||
private boolean propagateInputHeaders;
|
||||
|
||||
/*
|
||||
* This is primarily to support Stream's ability to access
|
||||
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
|
||||
@@ -433,6 +435,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
this.outputType = this.normalizeType(outputType);
|
||||
this.functionDefinition = functionDefinition;
|
||||
this.message = this.inputType != null && FunctionTypeUtils.isMessage(this.inputType);
|
||||
if (functionProperties != null) {
|
||||
Map<String, FunctionConfigurationProperties> funcConfiguration = functionProperties.getConfiguration();
|
||||
if (!CollectionUtils.isEmpty(funcConfiguration)) {
|
||||
FunctionConfigurationProperties configuration = funcConfiguration.get(functionDefinition);
|
||||
if (configuration != null) {
|
||||
propagateInputHeaders = configuration.isCopyInputHeaders();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isSkipOutputConversion() {
|
||||
@@ -1091,6 +1102,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
}
|
||||
|
||||
private boolean isExtractPayload(Message<?> message, Type type) {
|
||||
if (this.propagateInputHeaders) {
|
||||
return false;
|
||||
}
|
||||
if (this.isRoutingFunction()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ import org.springframework.cloud.function.context.FunctionType;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.lang.Nullable;
|
||||
@@ -673,6 +674,29 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
assertThat(result).startsWith("{date=");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test_791() {
|
||||
try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.main.lazy-initialization=true")) {
|
||||
FunctionCatalog catalog = ac.getBean(FunctionCatalog.class);
|
||||
|
||||
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
|
||||
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
|
||||
assertThat(result.getHeaders()).doesNotContainKey("foo");
|
||||
}
|
||||
try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.main.lazy-initialization=true",
|
||||
"--spring.cloud.function.configuration.uppercase.copy-input-headers=true")) {
|
||||
FunctionCatalog catalog = ac.getBean(FunctionCatalog.class);
|
||||
|
||||
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
|
||||
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
|
||||
assertThat(result.getHeaders()).containsKey("foo");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
@Test
|
||||
public void testArrayPayloadOnFluxFunction() throws Exception {
|
||||
@@ -924,6 +948,16 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class InputHeaderPropagationConfiguration {
|
||||
|
||||
@Bean
|
||||
public Function<String, String> uppercase() {
|
||||
return x -> x.toUpperCase();
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class SampleFunctionConfiguration {
|
||||
|
||||
Reference in New Issue
Block a user