GH-563 Add support for SNSEvent (AWS)

Resolves #563
This commit is contained in:
Oleg Zhurakousky
2020-07-24 15:53:03 +02:00
parent d447747a77
commit 8a15182266
3 changed files with 152 additions and 3 deletions

View File

@@ -33,6 +33,7 @@ import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
@@ -40,6 +41,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -142,6 +144,8 @@ public class FunctionInvoker implements RequestStreamHandler {
if (logger.isInfoEnabled()) {
logger.info("Located function: '" + functionName + "'");
}
mapper.registerModule(new JodaModule());
}
private void configureObjectMapper() {
@@ -179,20 +183,27 @@ public class FunctionInvoker implements RequestStreamHandler {
if (requestMap.containsKey("Records")) {
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
if (records.get(0).containsKey("kinesis")) {
if (this.isKinesisEvent(records.get(0))) {
logger.info("Incoming request is Kinesis Event");
Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else if (records.get(0).containsKey("s3")) {
else if (this.isS3Event(records.get(0))) {
logger.info("Incoming request is S3 Event");
Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only S3Event or Map type is supported as input type for functions that accept S3 Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else if (this.isSNSEvent(records.get(0))) {
logger.info("Incoming request is SNS Event");
Assert.isTrue(inputType instanceof Class && SNSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only SNSEvent or Map type is supported as input type for functions that accept SNSEvent");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SNSEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else {
logger.info("Incoming request is SQS Event");
Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
@@ -224,4 +235,16 @@ public class FunctionInvoker implements RequestStreamHandler {
}
return messageBuilder.setHeader("aws-context", context).build();
}
private boolean isSNSEvent(Map<String, ?> record) {
return record.containsKey("Sns");
}
private boolean isS3Event(Map<String, ?> record) {
return record.containsKey("s3");
}
private boolean isKinesisEvent(Map<String, ?> record) {
return record.containsKey("kinesis");
}
}

View File

@@ -26,6 +26,7 @@ import java.util.function.Function;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
@@ -92,6 +93,38 @@ public class FunctionInvokerTests {
" ]\n" +
"}";
String sampleSNSEvent = "{\n" +
" \"Records\": [\n" +
" {\n" +
" \"EventVersion\": \"1.0\",\n" +
" \"EventSubscriptionArn\": \"arn:aws:sns:us-east-2:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" +
" \"EventSource\": \"aws:sns\",\n" +
" \"Sns\": {\n" +
" \"SignatureVersion\": \"1\",\n" +
" \"Timestamp\": \"2019-01-02T12:45:07.000Z\",\n" +
" \"Signature\": \"tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==\",\n" +
" \"SigningCertUrl\": \"https://sns.us-east-2.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem\",\n" +
" \"MessageId\": \"95df01b4-ee98-5cb9-9903-4c221d41eb5e\",\n" +
" \"Message\": \"Hello from SNS!\",\n" +
" \"MessageAttributes\": {\n" +
" \"Test\": {\n" +
" \"Type\": \"String\",\n" +
" \"Value\": \"TestString\"\n" +
" },\n" +
" \"TestBinary\": {\n" +
" \"Type\": \"Binary\",\n" +
" \"Value\": \"TestBinary\"\n" +
" }\n" +
" },\n" +
" \"Type\": \"Notification\",\n" +
" \"UnsubscribeUrl\": \"https://sns.us-east-2.amazonaws.com/?Action=Unsubscribe&amp;SubscriptionArn=arn:aws:sns:us-east-2:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486\",\n" +
" \"TopicArn\":\"arn:aws:sns:us-east-2:123456789012:sns-lambda\",\n" +
" \"Subject\": \"TestInvoke\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}";
String sampleKinesisEvent = "{" +
" \"Records\": [" +
" {" +
@@ -434,6 +467,62 @@ public class FunctionInvokerTests {
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
}
@Test
public void testSNSStringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
}
@Test
public void testSNSEvent() throws Exception {
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSNSEvent");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sns");
}
@Test
public void testSNSEventAsMessage() throws Exception {
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSNSEventAsMessage");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sns");
}
@Test
public void testSNSEventAsMap() throws Exception {
System.setProperty("MAIN_CLASS", SNSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSNSEventAsMap");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSNSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sns");
}
@Test
public void testS3StringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
@@ -633,6 +722,39 @@ public class FunctionInvokerTests {
}
}
@EnableAutoConfiguration
@Configuration
public static class SNSConfiguration {
@Bean
public Function<String, String> echoString() {
return v -> v;
}
@Bean
public Function<SNSEvent, String> inputSNSEvent() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
@Bean
public Function<Message<SNSEvent>, String> inputSNSEventAsMessage() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
@Bean
public Function<Map<String, Object>, String> inputSNSEventAsMap() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
}
@EnableAutoConfiguration
@Configuration
public static class S3Configuration {