@@ -26,6 +26,7 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
import org.springframework.context.EnvironmentAware;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
*
|
||||
@@ -88,16 +89,34 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
String propertyX = "spring.cloud.function.configuration." + entry.getKey() + ".input-header-mapping-expression.";
|
||||
String propertyY = "spring.cloud.function.configuration." + entry.getKey() + ".inputHeaderMappingExpression.";
|
||||
Map<String, Object> headerMapping = entry.getValue().getInputHeaderMappingExpression();
|
||||
for (Object k : headerMapping.keySet()) {
|
||||
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
|
||||
Map<String, Object> originalMapping = entry.getValue().getInputHeaderMappingExpression();
|
||||
Map current = entry.getValue().getInputHeaderMappingExpression();
|
||||
if (current.containsKey("0")) {
|
||||
((Map) current.get("0")).put(k, headerMapping.get(k));
|
||||
if (!CollectionUtils.isEmpty(headerMapping)) {
|
||||
for (Object k : headerMapping.keySet()) {
|
||||
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
|
||||
Map current = entry.getValue().getInputHeaderMappingExpression();
|
||||
if (current.containsKey("0")) {
|
||||
((Map) current.get("0")).put(k, headerMapping.get(k));
|
||||
}
|
||||
else {
|
||||
entry.getValue().setInputHeaderMappingExpression(Collections.singletonMap("0", current));
|
||||
break;
|
||||
}
|
||||
}
|
||||
else {
|
||||
entry.getValue().setInputHeaderMappingExpression(Collections.singletonMap("0", originalMapping));
|
||||
break;
|
||||
}
|
||||
}
|
||||
propertyX = "spring.cloud.function.configuration." + entry.getKey() + ".output-header-mapping-expression.";
|
||||
propertyY = "spring.cloud.function.configuration." + entry.getKey() + ".outputHeaderMappingExpression.";
|
||||
headerMapping = entry.getValue().getOutputHeaderMappingExpression();
|
||||
if (!CollectionUtils.isEmpty(headerMapping)) {
|
||||
for (Object k : headerMapping.keySet()) {
|
||||
if (this.environment.containsProperty(propertyX + k) || this.environment.containsProperty(propertyY + k)) {
|
||||
Map current = entry.getValue().getOutputHeaderMappingExpression();
|
||||
if (current.containsKey("0")) {
|
||||
((Map) current.get("0")).put(k, headerMapping.get(k));
|
||||
}
|
||||
else {
|
||||
entry.getValue().setOutputHeaderMappingExpression(Collections.singletonMap("0", current));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -149,6 +168,8 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
|
||||
private Map<String, Object> inputHeaderMappingExpression;
|
||||
|
||||
private Map<String, Object> outputHeaderMappingExpression;
|
||||
|
||||
public Map<String, Object> getInputHeaderMappingExpression() {
|
||||
return inputHeaderMappingExpression;
|
||||
}
|
||||
@@ -157,5 +178,14 @@ public class FunctionProperties implements EnvironmentAware, ApplicationContextA
|
||||
this.inputHeaderMappingExpression = inputHeaderMappingExpression;
|
||||
}
|
||||
|
||||
public Map<String, Object> getOutputHeaderMappingExpression() {
|
||||
return outputHeaderMappingExpression;
|
||||
}
|
||||
|
||||
public void setOutputHeaderMappingExpression(
|
||||
Map<String, Object> outputHeaderMappingExpression) {
|
||||
this.outputHeaderMappingExpression = outputHeaderMappingExpression;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,9 +42,9 @@ import org.springframework.util.Assert;
|
||||
* @since 3.1.3
|
||||
*
|
||||
*/
|
||||
class InputEnricher implements Function<Object, Object> {
|
||||
class HeaderEnricher implements Function<Object, Object> {
|
||||
|
||||
protected Log logger = LogFactory.getLog(InputEnricher.class);
|
||||
protected Log logger = LogFactory.getLog(HeaderEnricher.class);
|
||||
|
||||
private final Map<String, Map<String, String>> headerExpressions;
|
||||
|
||||
@@ -53,7 +53,7 @@ class InputEnricher implements Function<Object, Object> {
|
||||
private final StandardEvaluationContext evalContext = new StandardEvaluationContext();
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
InputEnricher(Map headerExpressions, @Nullable BeanResolver beanResolver) {
|
||||
HeaderEnricher(Map headerExpressions, @Nullable BeanResolver beanResolver) {
|
||||
Assert.notEmpty(headerExpressions, "'headerExpressions' must not be null or empty");
|
||||
this.headerExpressions = headerExpressions;
|
||||
this.evalContext.addPropertyAccessor(new MapAccessor());
|
||||
@@ -304,6 +304,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
composedFunction = (FunctionInvocationWrapper) composedFunction.andThen((Function<Object, Object>) andThenFunction);
|
||||
}
|
||||
composedFunction = this.enrichInputIfNecessary(composedFunction);
|
||||
composedFunction = this.enrichOutputIfNecessary(composedFunction);
|
||||
if (composedFunction.isSingleton) {
|
||||
this.wrappedFunctionDefinitions.put(composedFunction.functionDefinition, composedFunction);
|
||||
}
|
||||
@@ -329,8 +330,32 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
BeanFactoryResolver beanResolver = this.functionProperties.getApplicationContext() != null
|
||||
? new BeanFactoryResolver(this.functionProperties.getApplicationContext())
|
||||
: null;
|
||||
InputEnricher enricher = new InputEnricher(configuration.getInputHeaderMappingExpression(), beanResolver);
|
||||
FunctionInvocationWrapper w = new FunctionInvocationWrapper("headerEnricher", enricher, Message.class, Message.class);
|
||||
HeaderEnricher enricher = new HeaderEnricher(configuration.getInputHeaderMappingExpression(), beanResolver);
|
||||
FunctionInvocationWrapper w = new FunctionInvocationWrapper("inputHeaderEnricher", enricher, Message.class, Message.class);
|
||||
composedFunction = (FunctionInvocationWrapper) w.andThen((Function<Object, Object>) composedFunction);
|
||||
composedFunction.functionDefinition = functionDefinition;
|
||||
}
|
||||
}
|
||||
}
|
||||
return composedFunction;
|
||||
}
|
||||
|
||||
private FunctionInvocationWrapper enrichOutputIfNecessary(FunctionInvocationWrapper composedFunction) {
|
||||
if (this.functionProperties == null) {
|
||||
return composedFunction;
|
||||
}
|
||||
String functionDefinition = composedFunction.getFunctionDefinition();
|
||||
Map<String, FunctionConfigurationProperties> configurationProperties = this.functionProperties.getConfiguration();
|
||||
if (!CollectionUtils.isEmpty(configurationProperties)) {
|
||||
FunctionConfigurationProperties configuration = configurationProperties
|
||||
.get(functionDefinition.replace("|", "").replace(",", ""));
|
||||
if (configuration != null) {
|
||||
if (!CollectionUtils.isEmpty(configuration.getOutputHeaderMappingExpression())) {
|
||||
BeanFactoryResolver beanResolver = this.functionProperties.getApplicationContext() != null
|
||||
? new BeanFactoryResolver(this.functionProperties.getApplicationContext())
|
||||
: null;
|
||||
HeaderEnricher enricher = new HeaderEnricher(configuration.getOutputHeaderMappingExpression(), beanResolver);
|
||||
FunctionInvocationWrapper w = new FunctionInvocationWrapper("outputHeaderEnricher", enricher, Message.class, Message.class);
|
||||
composedFunction = (FunctionInvocationWrapper) w.andThen((Function<Object, Object>) composedFunction);
|
||||
composedFunction.functionDefinition = functionDefinition;
|
||||
}
|
||||
@@ -1050,7 +1075,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
if (this.isWrapConvertedInputInMessage(convertedInput)) {
|
||||
convertedInput = MessageBuilder.withPayload(convertedInput).build();
|
||||
}
|
||||
Assert.notNull(convertedInput, () -> "Failed to convert input: " + input + " to " + type);
|
||||
|
||||
Object finalInput = input;
|
||||
Assert.notNull(convertedInput, () -> "Failed to convert input: " + finalInput + " to " + type);
|
||||
return convertedInput;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2019-2021 the original author or authors.
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -33,8 +33,8 @@ import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
//NOTE!!! assertions for all tests are in 'echo' function since we're validating what's coming into it.
|
||||
public class InputHeaderMappingTests {
|
||||
//NOTE!!! assertions for input in all tests are in 'echo' function since we're validating what's coming into it.
|
||||
public class HeaderMappingTests {
|
||||
|
||||
@Test
|
||||
public void testErrorWarnAndContinue() throws Exception {
|
||||
@@ -173,6 +173,52 @@ public class InputHeaderMappingTests {
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testOutputHeaderMapping() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.main.lazy-initialization=true",
|
||||
"--spring.cloud.function.configuration.foo.output-header-mapping-expression.key1='hello1'",
|
||||
"--spring.cloud.function.configuration.foo.output-header-mapping-expression.key2=headers.contentType")) {
|
||||
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup("foo");
|
||||
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build());
|
||||
assertThat(result.getHeaders().containsKey("key1")).isTrue();
|
||||
assertThat(result.getHeaders().get("key1")).isEqualTo("hello1");
|
||||
assertThat(result.getHeaders().containsKey("key2")).isTrue();
|
||||
assertThat(result.getHeaders().get("key2")).isEqualTo("application/json");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test
|
||||
public void testMixedInputOutputHeaderMapping() throws Exception {
|
||||
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
|
||||
SampleFunctionConfiguration.class).web(WebApplicationType.NONE).run(
|
||||
"--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
"--spring.main.lazy-initialization=true",
|
||||
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut1='hello1'",
|
||||
"--spring.cloud.function.configuration.split.output-header-mapping-expression.keyOut2=headers.contentType",
|
||||
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key1=headers.path.split('/')[0]",
|
||||
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key2=headers.path.split('/')[1]",
|
||||
"--spring.cloud.function.configuration.split.input-header-mapping-expression.key3=headers.path")) {
|
||||
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup("split");
|
||||
Message<byte[]> result = (Message<byte[]>) function.apply(MessageBuilder.withPayload("helo")
|
||||
.setHeader(MessageHeaders.CONTENT_TYPE, "application/json")
|
||||
.setHeader("path", "foo/bar/baz").build());
|
||||
assertThat(result.getHeaders().containsKey("keyOut1")).isTrue();
|
||||
assertThat(result.getHeaders().get("keyOut1")).isEqualTo("hello1");
|
||||
assertThat(result.getHeaders().containsKey("keyOut2")).isTrue();
|
||||
assertThat(result.getHeaders().get("keyOut2")).isEqualTo("application/json");
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
protected static class SampleFunctionConfiguration {
|
||||
Reference in New Issue
Block a user