GH-943 Enhance AWS FunctionInvoker
This adds another constructor which allows function.definition to be passed as a constructor argument rather then rely on the property Resolves #943
This commit is contained in:
@@ -40,6 +40,7 @@ import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.FunctionProperties;
|
||||
import org.springframework.cloud.function.context.FunctionalSpringApplication;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.context.config.RoutingFunction;
|
||||
@@ -52,7 +53,6 @@ import org.springframework.core.env.Environment;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StreamUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
@@ -72,10 +72,17 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
|
||||
private FunctionInvocationWrapper function;
|
||||
|
||||
public FunctionInvoker() {
|
||||
private volatile String functionDefinition;
|
||||
|
||||
public FunctionInvoker(String functionDefinition) {
|
||||
this.functionDefinition = functionDefinition;
|
||||
this.start();
|
||||
}
|
||||
|
||||
public FunctionInvoker() {
|
||||
this(null);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes" })
|
||||
@Override
|
||||
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
|
||||
@@ -130,7 +137,10 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
: SpringApplication.run(new Class[] {startClass, AWSCompanionAutoConfiguration.class}, properties);
|
||||
|
||||
Environment environment = context.getEnvironment();
|
||||
String functionName = environment.getProperty("spring.cloud.function.definition");
|
||||
if (!StringUtils.hasText(this.functionDefinition)) {
|
||||
this.functionDefinition = environment.getProperty(FunctionProperties.FUNCTION_DEFINITION);
|
||||
}
|
||||
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
this.jsonMapper = context.getBean(JsonMapper.class);
|
||||
if (this.jsonMapper instanceof JacksonMapper) {
|
||||
@@ -154,15 +164,18 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
}
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Locating function: '" + functionName + "'");
|
||||
logger.info("Locating function: '" + this.functionDefinition + "'");
|
||||
}
|
||||
|
||||
this.function = functionCatalog.lookup(functionName, "application/json");
|
||||
|
||||
Set<String> names = functionCatalog.getNames(null);
|
||||
if (this.function == null && !CollectionUtils.isEmpty(names)) {
|
||||
this.function = functionCatalog.lookup(this.functionDefinition, "application/json");
|
||||
|
||||
if (this.function == null) {
|
||||
if (logger.isInfoEnabled()) {
|
||||
if (!StringUtils.hasText(this.functionDefinition)) {
|
||||
logger.info("Failed to determine default function. Please use 'spring.cloud.function.definition' property "
|
||||
+ "or pass function definition as a constructir argument to this FunctionInvoker");
|
||||
}
|
||||
Set<String> names = functionCatalog.getNames(null);
|
||||
if (names.size() == 1) {
|
||||
logger.info("Will default to RoutingFunction, since it is the only function available in FunctionCatalog."
|
||||
+ "Expecting 'spring.cloud.function.definition' or 'spring.cloud.function.routing-expression' as Message headers. "
|
||||
@@ -181,14 +194,11 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
if (this.function.isOutputTypePublisher()) {
|
||||
this.function.setSkipOutputConversion(true);
|
||||
}
|
||||
Assert.notNull(this.function, "Failed to lookup function " + functionName);
|
||||
|
||||
if (!StringUtils.hasText(functionName)) {
|
||||
functionName = this.function.getFunctionDefinition();
|
||||
}
|
||||
Assert.notNull(this.function, "Failed to lookup function " + this.functionDefinition);
|
||||
|
||||
this.functionDefinition = this.function.getFunctionDefinition();
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Located function: '" + functionName + "'");
|
||||
logger.info("Located function: '" + this.functionDefinition + "'");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -578,6 +578,19 @@ public class FunctionInvokerTests {
|
||||
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSQSEventWithConstructorArg() throws Exception {
|
||||
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
|
||||
FunctionInvoker invoker = new FunctionInvoker("inputSQSEvent");
|
||||
|
||||
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
invoker.handleRequest(targetStream, output, null);
|
||||
|
||||
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSQSEventAsMessage() throws Exception {
|
||||
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
|
||||
|
||||
Reference in New Issue
Block a user