From 8c4c1aefdebc68e5c3344083a272bda2bd4fe36f Mon Sep 17 00:00:00 2001 From: Semyon Fishman Date: Wed, 13 Feb 2019 13:05:31 -0500 Subject: [PATCH] GH-256, Add AWS support for Consumer's and Supplier's Added support for Consumer and Supplier functions when executing within AWS Lambda. It appears that this was previously supported but was removed in 083d5e for some reason. Improve Consumer support for AWS API Gateway Consumer type functions, by definition, do not return anything; i.e. they do not have a response. However when executing within a Lambda + API Gateway environment, the Lambda must still return a 200-level status response to signal the API Gateway that the function executed succesfully. I added the logic for this. Improve Supplier support for AWS API Gateway Supplier functions, by defnition, do not take input; in the case of API Gateway calls this type of execution passes a null body, which fails to deserialize and throws an exception. I added logic that skips the deserialization of the body in the case it is a Supplier. --- .../SpringBootApiGatewayRequestHandler.java | 10 ++ .../adapter/aws/SpringBootRequestHandler.java | 2 +- .../aws/SpringFunctionInitializer.java | 118 +++++++++++++++--- .../aws/SpringFunctionInitializerTests.java | 18 +++ 4 files changed, 130 insertions(+), 18 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootApiGatewayRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootApiGatewayRequestHandler.java index 06b3fc96f..07c86f328 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootApiGatewayRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootApiGatewayRequestHandler.java @@ -19,6 +19,7 @@ package org.springframework.cloud.function.adapter.aws; import java.util.HashMap; import java.util.Map; +import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent; import com.fasterxml.jackson.core.JsonProcessingException; @@ -124,4 +125,13 @@ public class SpringBootApiGatewayRequestHandler extends } } + @Override + public Object handleRequest(APIGatewayProxyRequestEvent event, Context context) { + Object response = super.handleRequest(event, context); + if (returnsOutput()) + return response; + else + return new APIGatewayProxyResponseEvent() + .withStatusCode(HttpStatus.OK.value()); + } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java index 3f795447e..bb724ed40 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootRequestHandler.java @@ -44,7 +44,7 @@ public class SpringBootRequestHandler extends SpringFunctionInitializer @Override public Object handleRequest(E event, Context context) { initialize(); - Object input = convertEvent(event); + Object input = acceptsInput() ? convertEvent(event) : ""; Publisher output = apply(extract(input)); return result(input, output); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java index 8d86e0877..3e8ed3e2a 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializer.java @@ -20,10 +20,12 @@ import java.io.Closeable; import java.io.InputStream; import java.net.URL; import java.util.Collections; +import java.util.Iterator; import java.util.List; -import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.jar.Manifest; import org.apache.commons.logging.Log; @@ -50,6 +52,10 @@ public class SpringFunctionInitializer implements Closeable { private Function, Publisher> function; + private Consumer> consumer; + + private Supplier> supplier; + private AtomicBoolean initialized = new AtomicBoolean(); @Autowired(required = false) @@ -130,27 +136,83 @@ public class SpringFunctionInitializer implements Closeable { SpringApplication builder = springApplication(); ConfigurableApplicationContext context = builder.run(); context.getAutowireCapableBeanFactory().autowireBean(this); - String name = context.getEnvironment().getProperty("function.name"); - if (name == null) { - name = "function"; - } + this.context = context; if (this.catalog == null) { - if (context.containsBean(name)) { - this.function = context.getBean(name, Function.class); - } + initFunctionConsumerOrSupplierFromContext(); } else { - Set functionNames = this.catalog.getNames(Function.class); - if (functionNames.size() == 1) { - this.function = this.catalog.lookup(Function.class, - functionNames.iterator().next()); + initFunctionConsumerOrSupplierFromCatalog(); + } + } + + private String resolveName(Class type) { + String functionName = context.getEnvironment().getProperty("function.name"); + if (functionName != null) + return functionName; + else if (type.isAssignableFrom(Function.class)) + return "function"; + else if (type.isAssignableFrom(Consumer.class)) + return "consumer"; + else if (type.isAssignableFrom(Supplier.class)) + return "supplier"; + throw new IllegalStateException("Unknown type " + type); + } + + private void initFunctionConsumerOrSupplierFromContext() { + String name = resolveName(Function.class); + if (context.containsBean(name) && context.getBean(name) instanceof Function) { + this.function = context.getBean(name, Function.class); + return; + } + + name = resolveName(Consumer.class); + if (context.containsBean(name) && context.getBean(name) instanceof Consumer) { + this.consumer = context.getBean(name, Consumer.class); + return; + } + + name = resolveName(Supplier.class); + if (context.containsBean(name) && context.getBean(name) instanceof Supplier) { + this.supplier = context.getBean(name, Supplier.class); + return; + } + } + + private void initFunctionConsumerOrSupplierFromCatalog() { + String name = resolveName(Function.class); + this.function = this.catalog.lookup(Function.class, name); + if (this.function != null) + return; + + name = resolveName(Consumer.class); + this.consumer = this.catalog.lookup(Consumer.class, name); + if (this.consumer != null) + return; + + name = resolveName(Supplier.class); + this.supplier = this.catalog.lookup(Supplier.class, name); + if (this.supplier != null) + return; + + if (this.catalog.size() == 1) { + Iterator names = this.catalog.getNames(Function.class).iterator(); + if (names.hasNext()) { + this.function = this.catalog.lookup(Function.class, names.next()); + return; } - else { - this.function = this.catalog.lookup(Function.class, name); + + names = this.catalog.getNames(Consumer.class).iterator(); + if (names.hasNext()) { + this.consumer = this.catalog.lookup(Consumer.class, names.next()); + return; + } + + names = this.catalog.getNames(Supplier.class).iterator(); + if (names.hasNext()) { + this.supplier = this.catalog.lookup(Supplier.class, names.next()); + return; } } - this.context = context; - } private SpringApplication springApplication() { @@ -169,13 +231,35 @@ public class SpringFunctionInitializer implements Closeable { } protected Object function() { - return this.function; + if (this.function != null) + return this.function; + else if (this.consumer != null) + return this.consumer; + else if (this.supplier != null) + return this.supplier; + else + return null; + } + + protected boolean acceptsInput() { + return !this.inspector.getInputType(function()).equals(Void.class); + } + + protected boolean returnsOutput() { + return !this.inspector.getOutputType(function()).equals(Void.class); } protected Publisher apply(Publisher input) { if (this.function != null) { return Flux.from(this.function.apply(input)); } + if (this.consumer != null) { + this.consumer.accept(input); + return Flux.empty(); + } + if (this.supplier != null) { + return this.supplier.get(); + } throw new IllegalStateException("No function defined"); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java index 96bbaba6d..b0829f297 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringFunctionInitializerTests.java @@ -18,6 +18,7 @@ package org.springframework.cloud.function.adapter.aws; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.junit.After; @@ -102,6 +103,14 @@ public class SpringFunctionInitializerTests { assertThat(result.toStream().collect(Collectors.toList())).isEmpty(); } + @Test + public void supplierCatalog() { + initializer = new SpringFunctionInitializer(SupplierConfig.class); + initializer.initialize(); + Flux result = Flux.from(initializer.apply(Flux.empty())); + assertThat(result.blockFirst()).isInstanceOf(Bar.class); + } + @Configuration protected static class FluxFunctionConfig { @@ -161,6 +170,15 @@ public class SpringFunctionInitializerTests { } + @Configuration + @Import(ContextFunctionCatalogAutoConfiguration.class) + protected static class SupplierConfig { + @Bean + public Supplier supplier() { + return () -> new Bar(); + } + } + @Configuration @Import(ContextFunctionCatalogAutoConfiguration.class) protected static class ConsumerConfig {