GH-698 Fix routing function support in AWS

Introduce auto-fallback to routing function if more then one function is present and no definition is provided.
Add explicit routing sample

Add documentation
This commit is contained in:
Oleg Zhurakousky
2021-05-19 15:22:57 +02:00
parent 86e141c199
commit 039387d9c6
19 changed files with 721 additions and 10 deletions

View File

@@ -47,8 +47,7 @@ public class CustomRuntimeInitializer implements ApplicationContextInitializer<G
CommandLineRunner.class, () -> args -> CustomRuntimeEventLoop.eventLoop(context));
}
}
else
if (ContextFunctionCatalogInitializer.enabled
else if (ContextFunctionCatalogInitializer.enabled
&& context.getEnvironment().getProperty("spring.functional.enabled", Boolean.class, false)) {
if (context.getBeanFactory().getBeanNamesForType(DestinationResolver.class, false, false).length == 0) {
context.registerBean(LambdaDestinationResolver.class, () -> new LambdaDestinationResolver());

View File

@@ -35,6 +35,7 @@ import reactor.core.publisher.Flux;
import org.springframework.boot.SpringApplication;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.utils.FunctionClassUtils;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.Environment;
@@ -42,6 +43,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
@@ -115,7 +117,6 @@ public class FunctionInvoker implements RequestStreamHandler {
}
private void start() {
System.out.println(FunctionClassUtils.getStartClass().getName());
ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass(), "--spring.main.web-application-type=none");
Environment environment = context.getEnvironment();
String functionName = environment.getProperty("spring.cloud.function.definition");
@@ -127,6 +128,16 @@ public class FunctionInvoker implements RequestStreamHandler {
}
this.function = functionCatalog.lookup(functionName, "application/json");
if (this.function == null && !CollectionUtils.isEmpty(functionCatalog.getNames(null))) {
if (logger.isInfoEnabled()) {
logger.info("More then one function is available in FunctionCatalog. Will default to RoutingFunction, "
+ "expecting 'spring.cloud.function.definition' or 'spring.cloud.function.routing-expression' as Message headers. "
+ "If invocation is over API Gateway, Message headers can be provided as HTTP headers.");
}
this.function = functionCatalog.lookup(RoutingFunction.FUNCTION_NAME, "application/json");
}
if (this.function.isOutputTypePublisher()) {
this.function.setSkipOutputConversion(true);
}

View File

@@ -19,6 +19,7 @@ package org.springframework.cloud.function.adapter.aws;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.function.Consumer;
@@ -31,6 +32,7 @@ import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
@@ -42,6 +44,7 @@ import org.springframework.messaging.converter.AbstractMessageConverter;
import org.springframework.util.MimeType;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
/**
*
@@ -363,6 +366,13 @@ public class FunctionInvokerTests {
" \"isBase64Encoded\": false\n" +
"}";
@BeforeEach
public void before() throws Exception {
System.clearProperty("MAIN_CLASS");
System.clearProperty("spring.cloud.function.routing-expression");
System.clearProperty("spring.cloud.function.definition");
this.getEnvironment().clear();
}
@Test
public void testCollection() throws Exception {
@@ -706,6 +716,61 @@ public class FunctionInvokerTests {
assertThat(result.get("body")).isEqualTo("\"OK\"");
}
@Test
public void testWithDefaultRoutingFailure() throws Exception {
System.setProperty("MAIN_CLASS", SampleConfiguration.class.getName());
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
try {
invoker.handleRequest(targetStream, output, null);
fail();
}
catch (Exception e) {
// success, since no definition nor routing instructions are provided
}
}
@SuppressWarnings("rawtypes")
@Test
public void testWithDefaultRouting() throws Exception {
System.setProperty("MAIN_CLASS", SampleConfiguration.class.getName());
System.setProperty("spring.cloud.function.routing-expression", "'reverse'");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
Map result = mapper.readValue(output.toByteArray(), Map.class);
assertThat(result.get("body")).isEqualTo("\"olleh\"");
}
@SuppressWarnings("rawtypes")
@Test
public void testWithDefinitionEnvVariable() throws Exception {
System.setProperty("MAIN_CLASS", SampleConfiguration.class.getName());
this.getEnvironment().put("SPRING_CLOUD_FUNCTION_DEFINITION", "reverse|uppercase");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
Map result = mapper.readValue(output.toByteArray(), Map.class);
assertThat(result.get("body")).isEqualTo("\"OLLEH\"");
}
@SuppressWarnings("unchecked")
private Map<String, String> getEnvironment() throws Exception {
Map<String, String> env = System.getenv();
Field field = env.getClass().getDeclaredField("m");
field.setAccessible(true);
return (Map<String, String>) field.get(env);
}
@EnableAutoConfiguration
@Configuration
public static class SampleConfiguration {
@@ -714,6 +779,16 @@ public class FunctionInvokerTests {
return v -> v;
}
@Bean
public Function<String, String> uppercase() {
return v -> v.toUpperCase();
}
@Bean
public Function<String, String> reverse() {
return v -> new StringBuilder(v).reverse().toString();
}
@Bean
public Function<Flux<String>, Flux<String>> echoStringReactive() {
return v -> v;