GH-551 Fix support for AWS SQSEvent

Resolves #551
This commit is contained in:
Oleg Zhurakousky
2020-06-22 08:21:04 +02:00
parent 98fd8ca9cd
commit e8846443c4
2 changed files with 130 additions and 6 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2019 the original author or authors.
* Copyright 2019-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,12 +25,14 @@ import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
@@ -174,11 +176,23 @@ public class FunctionInvoker implements RequestStreamHandler {
if (request instanceof Map) {
Map<String, ?> requestMap = (Map<String, ?>) request;
if (requestMap.containsKey("Records")) {
logger.info("Incoming request is Kinesis Event");
Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only KinesisEvent or Map type is supported as input type for functions that accept with Kinesis Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
boolean kinesisEvent = records.get(0).containsKey("kinesis");
if (kinesisEvent) {
logger.info("Incoming request is Kinesis Event");
Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
else {
logger.info("Incoming request is SQS Event");
Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
"Only SQSEvent or Map type is supported as input type for functions that accept SQS Event");
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SQSEvent.class);
messageBuilder = MessageBuilder.withPayload(event);
}
}
else if (requestMap.containsKey("httpMethod")) { // API Gateway
logger.info("Incoming request is API Gateway");

View File

@@ -25,6 +25,7 @@ import java.util.function.Function;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -69,6 +70,27 @@ public class FunctionInvokerTests {
" \"body\": \"request_body\"" +
"}";
String sampleSQSEvent = "{\n" +
" \"Records\": [\n" +
" {\n" +
" \"messageId\": \"19dd0b57-b21e-4ac1-bd88-01bbb068cb78\",\n" +
" \"receiptHandle\": \"MessageReceiptHandle\",\n" +
" \"body\": \"Hello from SQS!\",\n" +
" \"attributes\": {\n" +
" \"ApproximateReceiveCount\": \"1\",\n" +
" \"SentTimestamp\": \"1523232000000\",\n" +
" \"SenderId\": \"123456789012\",\n" +
" \"ApproximateFirstReceiveTimestamp\": \"1523232000001\"\n" +
" },\n" +
" \"messageAttributes\": {},\n" +
" \"md5OfBody\": \"7b270e59b47ff90a553787216d55d91d\",\n" +
" \"eventSource\": \"aws:sqs\",\n" +
" \"eventSourceARN\": \"arn:aws:sqs:eu-central-1:123456789012:MyQueue\",\n" +
" \"awsRegion\": \"eu-central-1\"\n" +
" }\n" +
" ]\n" +
"}";
String sampleKinesisEvent = "{" +
" \"Records\": [" +
" {" +
@@ -317,6 +339,61 @@ public class FunctionInvokerTests {
assertThat(result).contains("49590338271490256608559692538361571095921575989136588898");
}
@Test
public void testSQSStringEvent() throws Exception {
Assertions.assertThrows(IllegalArgumentException.class, () -> {
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoString");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
});
}
@Test
public void testSQSEvent() throws Exception {
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSQSEvent");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
}
@Test
public void testSQSEventAsMessage() throws Exception {
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSQSEventAsMessage");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
}
@Test
public void testSQSEventAsMap() throws Exception {
System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputSQSEventAsMap");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
}
@SuppressWarnings("rawtypes")
@Test
public void testApiGatewayStringEventBody() throws Exception {
@@ -428,6 +505,39 @@ public class FunctionInvokerTests {
}
}
@EnableAutoConfiguration
@Configuration
public static class SQSConfiguration {
@Bean
public Function<String, String> echoString() {
return v -> v;
}
@Bean
public Function<SQSEvent, String> inputSQSEvent() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
@Bean
public Function<Message<SQSEvent>, String> inputSQSEventAsMessage() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
@Bean
public Function<Map<String, Object>, String> inputSQSEventAsMap() {
return v -> {
System.out.println("Received: " + v);
return v.toString();
};
}
}
@EnableAutoConfiguration
@Configuration
public static class ApiGatewayConfiguration {