GH-791 Add support for propagating input headers

Resolves #791

polishing merge
This commit is contained in:
Oleg Zhurakousky
2022-05-04 13:20:01 +02:00
parent 32c5873fd4
commit 3bfa5faa44
5 changed files with 101 additions and 3 deletions

View File

@@ -170,6 +170,8 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
private Map<String, Object> outputHeaderMappingExpression;
private boolean copyInputHeaders;
public Map<String, Object> getInputHeaderMappingExpression() {
return inputHeaderMappingExpression;
}
@@ -187,5 +189,13 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
this.outputHeaderMappingExpression = outputHeaderMappingExpression;
}
public boolean isCopyInputHeaders() {
return copyInputHeaders;
}
public void setCopyInputHeaders(boolean copyInputHeaders) {
this.copyInputHeaders = copyInputHeaders;
}
}
}

View File

@@ -401,6 +401,8 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
private boolean isSingleton = true;
private boolean propagateInputHeaders;
/*
* This is primarily to support Stream's ability to access
* un-converted payload (e.g., to evaluate expression on some attribute of a payload)
@@ -426,6 +428,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
this.outputType = this.normalizeType(outputType);
this.functionDefinition = functionDefinition;
this.message = this.inputType != null && FunctionTypeUtils.isMessage(this.inputType);
if (functionProperties != null) {
Map<String, FunctionConfigurationProperties> funcConfiguration = functionProperties.getConfiguration();
if (!CollectionUtils.isEmpty(funcConfiguration)) {
FunctionConfigurationProperties configuration = funcConfiguration.get(functionDefinition);
if (configuration != null) {
propagateInputHeaders = configuration.isCopyInputHeaders();
}
}
}
}
public boolean isSkipOutputConversion() {
@@ -1087,6 +1098,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
}
private boolean isExtractPayload(Message<?> message, Type type) {
if (this.propagateInputHeaders) {
return false;
}
if (this.isRoutingFunction()) {
return false;
}

View File

@@ -48,9 +48,14 @@ public class SmartCompositeMessageConverter extends CompositeMessageConverter {
if (!(message.getPayload() instanceof byte[]) && targetClass.isInstance(message.getPayload()) && !(message.getPayload() instanceof Collection<?>)) {
return message.getPayload();
}
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
try {
Object result = converter.fromMessage(message, targetClass);
if (result != null) {
return result;
}
}
catch (Exception e) {
// ignore
}
}
return null;

View File

@@ -57,6 +57,7 @@ import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.lang.Nullable;
@@ -677,6 +678,29 @@ public class BeanFactoryAwareFunctionRegistryTests {
assertThat(result).startsWith("{date=");
}
@Test
public void test_791() {
try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class)
.run("--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true")) {
FunctionCatalog catalog = ac.getBean(FunctionCatalog.class);
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).doesNotContainKey("foo");
}
try (ConfigurableApplicationContext ac = new SpringApplicationBuilder(InputHeaderPropagationConfiguration.class)
.run("--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.uppercase.copy-input-headers=true")) {
FunctionCatalog catalog = ac.getBean(FunctionCatalog.class);
Function<Message<String>, Message<byte[]>> uppercase = catalog.lookup("uppercase", "application/json");
Message<byte[]> result = uppercase.apply(MessageBuilder.withPayload("bob").setHeader("foo", "bar").build());
assertThat(result.getHeaders()).containsKey("foo");
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testArrayPayloadOnFluxFunction() throws Exception {
@@ -928,6 +952,16 @@ public class BeanFactoryAwareFunctionRegistryTests {
}
}
@EnableAutoConfiguration
@Configuration
protected static class InputHeaderPropagationConfiguration {
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
}
@EnableAutoConfiguration
@Configuration
protected static class SampleFunctionConfiguration {