GH-764 Add support for output header enrichemnt

Resolves #764
This commit is contained in:
Oleg Zhurakousky
2021-11-08 15:26:11 +01:00
parent 5af3d14918
commit f4171cae16
5 changed files with 146 additions and 35 deletions

View File

@@ -26,6 +26,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
/**
*
@@ -88,16 +89,34 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
String propertyX = "spring.cloud.function.configuration." + entry.getKey() + ".input-header-mapping-expression.";
String propertyY = "spring.cloud.function.configuration." + entry.getKey() + ".inputHeaderMappingExpression.";
Map<String, Object> headerMapping = entry.getValue().getInputHeaderMappingExpression();
for (Object k : headerMapping.keySet()) {
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
Map<String, Object> originalMapping = entry.getValue().getInputHeaderMappingExpression();
Map current = entry.getValue().getInputHeaderMappingExpression();
if (current.containsKey("0")) {
((Map) current.get("0")).put(k, headerMapping.get(k));
if (!CollectionUtils.isEmpty(headerMapping)) {
for (Object k : headerMapping.keySet()) {
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
Map current = entry.getValue().getInputHeaderMappingExpression();
if (current.containsKey("0")) {
((Map) current.get("0")).put(k, headerMapping.get(k));
}
else {
entry.getValue().setInputHeaderMappingExpression(Collections.singletonMap("0", current));
break;
}
}
else {
entry.getValue().setInputHeaderMappingExpression(Collections.singletonMap("0", originalMapping));
break;
}
}
propertyX = "spring.cloud.function.configuration." + entry.getKey() + ".output-header-mapping-expression.";
propertyY = "spring.cloud.function.configuration." + entry.getKey() + ".outputHeaderMappingExpression.";
headerMapping = entry.getValue().getOutputHeaderMappingExpression();
if (!CollectionUtils.isEmpty(headerMapping)) {
for (Object k : headerMapping.keySet()) {
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
Map current = entry.getValue().getOutputHeaderMappingExpression();
if (current.containsKey("0")) {
((Map) current.get("0")).put(k, headerMapping.get(k));
}
else {
entry.getValue().setOutputHeaderMappingExpression(Collections.singletonMap("0", current));
break;
}
}
}
}
@@ -149,6 +168,8 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
private Map<String, Object> inputHeaderMappingExpression;
private Map<String, Object> outputHeaderMappingExpression;
public Map<String, Object> getInputHeaderMappingExpression() {
return inputHeaderMappingExpression;
}
@@ -157,5 +178,14 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
this.inputHeaderMappingExpression = inputHeaderMappingExpression;
}
public Map<String, Object> getOutputHeaderMappingExpression() {
return outputHeaderMappingExpression;
}
public void setOutputHeaderMappingExpression(
Map<String, Object> outputHeaderMappingExpression) {
this.outputHeaderMappingExpression = outputHeaderMappingExpression;
}
}
}

View File

@@ -42,9 +42,9 @@ import org.springframework.util.Assert;
* @since 3.1.3
*
*/
class InputEnricher implements Function<Object, Object> {
class HeaderEnricher implements Function<Object, Object> {
protected Log logger = LogFactory.getLog(InputEnricher.class);
protected Log logger = LogFactory.getLog(HeaderEnricher.class);
private final Map<String, Map<String, String>> headerExpressions;
@@ -53,7 +53,7 @@ class InputEnricher implements Function<Object, Object> {
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
@SuppressWarnings({ "rawtypes", "unchecked" })
InputEnricher(Map headerExpressions, @Nullable BeanResolver beanResolver) {
HeaderEnricher(Map headerExpressions, @Nullable BeanResolver beanResolver) {
Assert.notEmpty(headerExpressions, "'headerExpressions' must not be null or empty");
this.headerExpressions = headerExpressions;
this.evalContext.addPropertyAccessor(new MapAccessor());

View File

@@ -304,6 +304,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
composedFunction = (FunctionInvocationWrapper) composedFunction.andThen((Function<Object, Object>) andThenFunction);
}
composedFunction = this.enrichInputIfNecessary(composedFunction);
composedFunction = this.enrichOutputIfNecessary(composedFunction);
if (composedFunction.isSingleton) {
this.wrappedFunctionDefinitions.put(composedFunction.functionDefinition, composedFunction);
}
@@ -329,8 +330,32 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
BeanFactoryResolver beanResolver = this.functionProperties.getApplicationContext() != null
? new BeanFactoryResolver(this.functionProperties.getApplicationContext())
: null;
InputEnricher enricher = new InputEnricher(configuration.getInputHeaderMappingExpression(), beanResolver);
FunctionInvocationWrapper w = new FunctionInvocationWrapper("headerEnricher", enricher, Message.class, Message.class);
HeaderEnricher enricher = new HeaderEnricher(configuration.getInputHeaderMappingExpression(), beanResolver);
FunctionInvocationWrapper w = new FunctionInvocationWrapper("inputHeaderEnricher", enricher, Message.class, Message.class);
composedFunction = (FunctionInvocationWrapper) w.andThen((Function<Object, Object>) composedFunction);
composedFunction.functionDefinition = functionDefinition;
}
}
}
return composedFunction;
}
private FunctionInvocationWrapper enrichOutputIfNecessary(FunctionInvocationWrapper composedFunction) {
if (this.functionProperties == null) {
return composedFunction;
}
String functionDefinition = composedFunction.getFunctionDefinition();
Map<String, FunctionConfigurationProperties> configurationProperties = this.functionProperties.getConfiguration();
if (!CollectionUtils.isEmpty(configurationProperties)) {
FunctionConfigurationProperties configuration = configurationProperties
.get(functionDefinition.replace("|", "").replace(",", ""));
if (configuration != null) {
if (!CollectionUtils.isEmpty(configuration.getOutputHeaderMappingExpression())) {
BeanFactoryResolver beanResolver = this.functionProperties.getApplicationContext() != null
? new BeanFactoryResolver(this.functionProperties.getApplicationContext())
: null;
HeaderEnricher enricher = new HeaderEnricher(configuration.getOutputHeaderMappingExpression(), beanResolver);
FunctionInvocationWrapper w = new FunctionInvocationWrapper("outputHeaderEnricher", enricher, Message.class, Message.class);
composedFunction = (FunctionInvocationWrapper) w.andThen((Function<Object, Object>) composedFunction);
composedFunction.functionDefinition = functionDefinition;
}
@@ -1050,7 +1075,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
if (this.isWrapConvertedInputInMessage(convertedInput)) {
convertedInput = MessageBuilder.withPayload(convertedInput).build();
}
Assert.notNull(convertedInput, () -> "Failed to convert input: " + input + " to " + type);
Object finalInput = input;
Assert.notNull(convertedInput, () -> "Failed to convert input: " + finalInput + " to " + type);
return convertedInput;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,8 +33,8 @@ import org.springframework.messaging.support.MessageBuilder;
import static org.assertj.core.api.Assertions.assertThat;
//NOTE!!! assertions for all tests are in 'echo' function since we're validating what's coming into it.
public class InputHeaderMappingTests {
//NOTE!!! assertions for input in all tests are in 'echo' function since we're validating what's coming into it.
public class HeaderMappingTests {
@Test
public void testErrorWarnAndContinue() throws Exception {
@@ -173,6 +173,52 @@ public class InputHeaderMappingTests {
}
}
@SuppressWarnings("unchecked")
@Test
public void testOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.foo.output-header-mapping-expression.key1='hello1'",
"--spring.cloud.function.configuration.foo.output-header-mapping-expression.key2=headers.contentType")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("foo");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build());
assertThat(result.getHeaders().containsKey("key1")).isTrue();
assertThat(result.getHeaders().get("key1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("key2")).isTrue();
assertThat(result.getHeaders().get("key2")).isEqualTo("application/json");
}
}
@SuppressWarnings("unchecked")
@Test
public void testMixedInputOutputHeaderMapping() throws Exception {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
"--logging.level.org.springframework.cloud.function=DEBUG",
"--spring.main.lazy-initialization=true",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
FunctionInvocationWrapper function = functionCatalog.lookup("split");
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
.setHeader("path", "foo/bar/baz").build());
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
}
}
@EnableAutoConfiguration
@Configuration
protected static class SampleFunctionConfiguration {