From a2dd8089ccbec12cc2b76fb3ebe23d46062b9b31 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 2 Nov 2022 09:35:38 +0100 Subject: [PATCH] GH-943 Enhance AWS FunctionInvoker This adds another constructor which allows function.definition to be passed as a constructor argument rather then rely on the property Resolves #943 --- .../function/adapter/aws/FunctionInvoker.java | 38 ++++++++++++------- .../adapter/aws/FunctionInvokerTests.java | 13 +++++++ 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index fcdb2bbf2..183b082f1 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -40,6 +40,7 @@ import reactor.core.publisher.Flux; import org.springframework.boot.SpringApplication; import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionProperties; import org.springframework.cloud.function.context.FunctionalSpringApplication; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.context.config.RoutingFunction; @@ -52,7 +53,6 @@ import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; import org.springframework.util.StreamUtils; import org.springframework.util.StringUtils; @@ -72,10 +72,17 @@ public class FunctionInvoker implements RequestStreamHandler { private FunctionInvocationWrapper function; - public FunctionInvoker() { + private volatile String functionDefinition; + + public FunctionInvoker(String functionDefinition) { + this.functionDefinition = functionDefinition; this.start(); } + public FunctionInvoker() { + this(null); + } + @SuppressWarnings({ "rawtypes" }) @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { @@ -130,7 +137,10 @@ public class FunctionInvoker implements RequestStreamHandler { : SpringApplication.run(new Class[] {startClass, AWSCompanionAutoConfiguration.class}, properties); Environment environment = context.getEnvironment(); - String functionName = environment.getProperty("spring.cloud.function.definition"); + if (!StringUtils.hasText(this.functionDefinition)) { + this.functionDefinition = environment.getProperty(FunctionProperties.FUNCTION_DEFINITION); + } + FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class); this.jsonMapper = context.getBean(JsonMapper.class); if (this.jsonMapper instanceof JacksonMapper) { @@ -154,15 +164,18 @@ public class FunctionInvoker implements RequestStreamHandler { } if (logger.isInfoEnabled()) { - logger.info("Locating function: '" + functionName + "'"); + logger.info("Locating function: '" + this.functionDefinition + "'"); } - this.function = functionCatalog.lookup(functionName, "application/json"); - - Set names = functionCatalog.getNames(null); - if (this.function == null && !CollectionUtils.isEmpty(names)) { + this.function = functionCatalog.lookup(this.functionDefinition, "application/json"); + if (this.function == null) { if (logger.isInfoEnabled()) { + if (!StringUtils.hasText(this.functionDefinition)) { + logger.info("Failed to determine default function. Please use 'spring.cloud.function.definition' property " + + "or pass function definition as a constructir argument to this FunctionInvoker"); + } + Set names = functionCatalog.getNames(null); if (names.size() == 1) { logger.info("Will default to RoutingFunction, since it is the only function available in FunctionCatalog." + "Expecting 'spring.cloud.function.definition' or 'spring.cloud.function.routing-expression' as Message headers. " @@ -181,14 +194,11 @@ public class FunctionInvoker implements RequestStreamHandler { if (this.function.isOutputTypePublisher()) { this.function.setSkipOutputConversion(true); } - Assert.notNull(this.function, "Failed to lookup function " + functionName); - - if (!StringUtils.hasText(functionName)) { - functionName = this.function.getFunctionDefinition(); - } + Assert.notNull(this.function, "Failed to lookup function " + this.functionDefinition); + this.functionDefinition = this.function.getFunctionDefinition(); if (logger.isInfoEnabled()) { - logger.info("Located function: '" + functionName + "'"); + logger.info("Located function: '" + this.functionDefinition + "'"); } } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index c4f5cf632..9dba9ec95 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -578,6 +578,19 @@ public class FunctionInvokerTests { assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); } + @Test + public void testSQSEventWithConstructorArg() throws Exception { + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + FunctionInvoker invoker = new FunctionInvoker("inputSQSEvent"); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); + } + @Test public void testSQSEventAsMessage() throws Exception { System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());