From 16f2cc85aa63446ffc63439d49c00a30bf694553 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Thu, 16 Jul 2020 09:41:23 +0200 Subject: [PATCH] GH-560 Add explicit support for S3 AWS Event Resolves #560 --- .../function/adapter/aws/FunctionInvoker.java | 11 +- .../adapter/aws/FunctionInvokerTests.java | 128 ++++++++++++++++++ 2 files changed, 137 insertions(+), 2 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index 2ecf94a4d..d42a83117 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -32,6 +32,7 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.S3Event; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; @@ -178,14 +179,20 @@ public class FunctionInvoker implements RequestStreamHandler { if (requestMap.containsKey("Records")) { List> records = (List>) requestMap.get("Records"); Assert.notEmpty(records, "Incoming event has no records: " + requestMap); - boolean kinesisEvent = records.get(0).containsKey("kinesis"); - if (kinesisEvent) { + if (records.get(0).containsKey("kinesis")) { logger.info("Incoming request is Kinesis Event"); Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, "Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event"); Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); messageBuilder = MessageBuilder.withPayload(event); } + else if (records.get(0).containsKey("s3")) { + logger.info("Incoming request is S3 Event"); + Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class) inputType) || mapInputType, + "Only S3Event or Map type is supported as input type for functions that accept S3 Event"); + Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class); + messageBuilder = MessageBuilder.withPayload(event); + } else { logger.info("Incoming request is SQS Event"); Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index cd06c8418..9f80ea437 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -25,6 +25,7 @@ import java.util.function.Function; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.S3Event; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Assertions; @@ -206,6 +207,45 @@ public class FunctionInvokerTests { " \"isBase64Encoded\": false\n" + "}"; + String s3Event = "{\n" + + " \"Records\":[\n" + + " {\n" + + " \"eventVersion\":\"2.1\",\n" + + " \"eventSource\":\"aws:s3\",\n" + + " \"awsRegion\":\"us-east-2\",\n" + + " \"eventTime\":\"2020-07-15T21:29:41.365Z\",\n" + + " \"eventName\":\"ObjectCreated:Put\",\n" + + " \"userIdentity\":{\n" + + " \"principalId\":\"AWS:AIxxx\"\n" + + " },\n" + + " \"requestParameters\":{\n" + + " \"sourceIPAddress\":\"xxxx\"\n" + + " },\n" + + " \"responseElements\":{\n" + + " \"x-amz-request-id\":\"xxxx\",\n" + + " \"x-amz-id-2\":\"xxx/=\"\n" + + " },\n" + + " \"s3\":{\n" + + " \"s3SchemaVersion\":\"1.0\",\n" + + " \"configurationId\":\"New Data Delivery\",\n" + + " \"bucket\":{\n" + + " \"name\":\"bucket\",\n" + + " \"ownerIdentity\":{\n" + + " \"principalId\":\"xxx\"\n" + + " },\n" + + " \"arn\":\"arn:aws:s3:::bucket\"\n" + + " },\n" + + " \"object\":{\n" + + " \"key\":\"test/file.geojson\",\n" + + " \"size\":32711,\n" + + " \"eTag\":\"aaaa\",\n" + + " \"sequencer\":\"aaaa\"\n" + + " }\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + String apiGatewayEventWithStructuredBody = "{\n" + " \"resource\": \"/uppercase2\",\n" + " \"path\": \"/uppercase2\",\n" + @@ -394,6 +434,61 @@ public class FunctionInvokerTests { assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); } + @Test + public void testS3StringEvent() throws Exception { + Assertions.assertThrows(IllegalArgumentException.class, () -> { + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + }); + } + + @Test + public void testS3Event() throws Exception { + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputS3Event"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("s3SchemaVersion"); + } + + @Test + public void testS3EventAsMessage() throws Exception { + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputS3EventAsMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("s3SchemaVersion"); + } + + @Test + public void testS3EventAsMap() throws Exception { + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputS3EventAsMap"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("s3SchemaVersion"); + } + @SuppressWarnings("rawtypes") @Test public void testApiGatewayStringEventBody() throws Exception { @@ -538,6 +633,39 @@ public class FunctionInvokerTests { } } + @EnableAutoConfiguration + @Configuration + public static class S3Configuration { + @Bean + public Function echoString() { + return v -> v; + } + + @Bean + public Function inputS3Event() { + return v -> { + System.out.println("Received: " + v); + return v.toJson(); + }; + } + + @Bean + public Function, String> inputS3EventAsMessage() { + return v -> { + System.out.println("Received: " + v); + return v.getPayload().toJson(); + }; + } + + @Bean + public Function, String> inputS3EventAsMap() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + } + @EnableAutoConfiguration @Configuration public static class ApiGatewayConfiguration {