diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml index 6138d07a2..38384ecd7 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml @@ -21,7 +21,7 @@ UTF-8 1.8 2.2.6 - 1.11.557 + 1.11.825 1.0.3 1.0.3 @@ -52,6 +52,10 @@ org.springframework.boot spring-boot-starter + + com.fasterxml.jackson.datatype + jackson-datatype-joda + com.amazonaws aws-lambda-java-log4j diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index d42a83117..c5596c7dc 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -33,6 +33,7 @@ import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; @@ -40,6 +41,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -142,6 +144,8 @@ public class FunctionInvoker implements RequestStreamHandler { if (logger.isInfoEnabled()) { logger.info("Located function: '" + functionName + "'"); } + + mapper.registerModule(new JodaModule()); } private void configureObjectMapper() { @@ -179,20 +183,27 @@ public class FunctionInvoker implements RequestStreamHandler { if (requestMap.containsKey("Records")) { List> records = (List>) requestMap.get("Records"); Assert.notEmpty(records, "Incoming event has no records: " + requestMap); - if (records.get(0).containsKey("kinesis")) { + if (this.isKinesisEvent(records.get(0))) { logger.info("Incoming request is Kinesis Event"); Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, "Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event"); Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); messageBuilder = MessageBuilder.withPayload(event); } - else if (records.get(0).containsKey("s3")) { + else if (this.isS3Event(records.get(0))) { logger.info("Incoming request is S3 Event"); Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class) inputType) || mapInputType, "Only S3Event or Map type is supported as input type for functions that accept S3 Event"); Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class); messageBuilder = MessageBuilder.withPayload(event); } + else if (this.isSNSEvent(records.get(0))) { + logger.info("Incoming request is SNS Event"); + Assert.isTrue(inputType instanceof Class && SNSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, + "Only SNSEvent or Map type is supported as input type for functions that accept SNSEvent"); + Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SNSEvent.class); + messageBuilder = MessageBuilder.withPayload(event); + } else { logger.info("Incoming request is SQS Event"); Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, @@ -224,4 +235,16 @@ public class FunctionInvoker implements RequestStreamHandler { } return messageBuilder.setHeader("aws-context", context).build(); } + + private boolean isSNSEvent(Map record) { + return record.containsKey("Sns"); + } + + private boolean isS3Event(Map record) { + return record.containsKey("s3"); + } + + private boolean isKinesisEvent(Map record) { + return record.containsKey("kinesis"); + } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index 9f80ea437..c20ac3916 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -26,6 +26,7 @@ import java.util.function.Function; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.S3Event; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Assertions; @@ -92,6 +93,38 @@ public class FunctionInvokerTests { " ]\n" + "}"; + String sampleSNSEvent = "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"EventVersion\": \"1.0\",\n" + + " \"EventSubscriptionArn\": \"arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"EventSource\": \"aws:sns\",\n" + + " \"Sns\": {\n" + + " \"SignatureVersion\": \"1\",\n" + + " \"Timestamp\": \"2019-01-02T12:45:07.000Z\",\n" + + " \"Signature\": \"tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==\",\n" + + " \"SigningCertUrl\": \"https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem\",\n" + + " \"MessageId\": \"95df01b4-ee98-5cb9-9903-4c221d41eb5e\",\n" + + " \"Message\": \"Hello from SNS!\",\n" + + " \"MessageAttributes\": {\n" + + " \"Test\": {\n" + + " \"Type\": \"String\",\n" + + " \"Value\": \"TestString\"\n" + + " },\n" + + " \"TestBinary\": {\n" + + " \"Type\": \"Binary\",\n" + + " \"Value\": \"TestBinary\"\n" + + " }\n" + + " },\n" + + " \"Type\": \"Notification\",\n" + + " \"UnsubscribeUrl\": \"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" + + " \"TopicArn\":\"arn:aws:sns:us-east-2:123456789012:sns-lambda\",\n" + + " \"Subject\": \"TestInvoke\"\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + String sampleKinesisEvent = "{" + " \"Records\": [" + " {" + @@ -434,6 +467,62 @@ public class FunctionInvokerTests { assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); } + @Test + public void testSNSStringEvent() throws Exception { + Assertions.assertThrows(IllegalArgumentException.class, () -> { + System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + }); + } + + @Test + public void testSNSEvent() throws Exception { + System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSNSEvent"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sns"); + } + + @Test + public void testSNSEventAsMessage() throws Exception { + System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSNSEventAsMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sns"); + } + + @Test + public void testSNSEventAsMap() throws Exception { + System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSNSEventAsMap"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sns"); + } + + @Test public void testS3StringEvent() throws Exception { Assertions.assertThrows(IllegalArgumentException.class, () -> { @@ -633,6 +722,39 @@ public class FunctionInvokerTests { } } + @EnableAutoConfiguration + @Configuration + public static class SNSConfiguration { + @Bean + public Function echoString() { + return v -> v; + } + + @Bean + public Function inputSNSEvent() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + + @Bean + public Function, String> inputSNSEventAsMessage() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + + @Bean + public Function, String> inputSNSEventAsMap() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + } + @EnableAutoConfiguration @Configuration public static class S3Configuration {