@@ -32,6 +32,7 @@ import com.amazonaws.services.lambda.runtime.Context;
|
||||
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.S3Event;
|
||||
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
|
||||
import com.fasterxml.jackson.core.JsonParser;
|
||||
import com.fasterxml.jackson.databind.DeserializationContext;
|
||||
@@ -178,14 +179,20 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
if (requestMap.containsKey("Records")) {
|
||||
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
|
||||
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
|
||||
boolean kinesisEvent = records.get(0).containsKey("kinesis");
|
||||
if (kinesisEvent) {
|
||||
if (records.get(0).containsKey("kinesis")) {
|
||||
logger.info("Incoming request is Kinesis Event");
|
||||
Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
|
||||
"Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event");
|
||||
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class);
|
||||
messageBuilder = MessageBuilder.withPayload(event);
|
||||
}
|
||||
else if (records.get(0).containsKey("s3")) {
|
||||
logger.info("Incoming request is S3 Event");
|
||||
Assert.isTrue(inputType instanceof Class && S3Event.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
|
||||
"Only S3Event or Map type is supported as input type for functions that accept S3 Event");
|
||||
Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, S3Event.class);
|
||||
messageBuilder = MessageBuilder.withPayload(event);
|
||||
}
|
||||
else {
|
||||
logger.info("Incoming request is SQS Event");
|
||||
Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class<?>) inputType) || mapInputType,
|
||||
|
||||
@@ -25,6 +25,7 @@ import java.util.function.Function;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.S3Event;
|
||||
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
@@ -206,6 +207,45 @@ public class FunctionInvokerTests {
|
||||
" \"isBase64Encoded\": false\n" +
|
||||
"}";
|
||||
|
||||
String s3Event = "{\n" +
|
||||
" \"Records\":[\n" +
|
||||
" {\n" +
|
||||
" \"eventVersion\":\"2.1\",\n" +
|
||||
" \"eventSource\":\"aws:s3\",\n" +
|
||||
" \"awsRegion\":\"us-east-2\",\n" +
|
||||
" \"eventTime\":\"2020-07-15T21:29:41.365Z\",\n" +
|
||||
" \"eventName\":\"ObjectCreated:Put\",\n" +
|
||||
" \"userIdentity\":{\n" +
|
||||
" \"principalId\":\"AWS:AIxxx\"\n" +
|
||||
" },\n" +
|
||||
" \"requestParameters\":{\n" +
|
||||
" \"sourceIPAddress\":\"xxxx\"\n" +
|
||||
" },\n" +
|
||||
" \"responseElements\":{\n" +
|
||||
" \"x-amz-request-id\":\"xxxx\",\n" +
|
||||
" \"x-amz-id-2\":\"xxx/=\"\n" +
|
||||
" },\n" +
|
||||
" \"s3\":{\n" +
|
||||
" \"s3SchemaVersion\":\"1.0\",\n" +
|
||||
" \"configurationId\":\"New Data Delivery\",\n" +
|
||||
" \"bucket\":{\n" +
|
||||
" \"name\":\"bucket\",\n" +
|
||||
" \"ownerIdentity\":{\n" +
|
||||
" \"principalId\":\"xxx\"\n" +
|
||||
" },\n" +
|
||||
" \"arn\":\"arn:aws:s3:::bucket\"\n" +
|
||||
" },\n" +
|
||||
" \"object\":{\n" +
|
||||
" \"key\":\"test/file.geojson\",\n" +
|
||||
" \"size\":32711,\n" +
|
||||
" \"eTag\":\"aaaa\",\n" +
|
||||
" \"sequencer\":\"aaaa\"\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" }\n" +
|
||||
" ]\n" +
|
||||
"}";
|
||||
|
||||
String apiGatewayEventWithStructuredBody = "{\n" +
|
||||
" \"resource\": \"/uppercase2\",\n" +
|
||||
" \"path\": \"/uppercase2\",\n" +
|
||||
@@ -394,6 +434,61 @@ public class FunctionInvokerTests {
|
||||
assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3StringEvent() throws Exception {
|
||||
Assertions.assertThrows(IllegalArgumentException.class, () -> {
|
||||
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
|
||||
System.setProperty("spring.cloud.function.definition", "echoString");
|
||||
FunctionInvoker invoker = new FunctionInvoker();
|
||||
|
||||
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
invoker.handleRequest(targetStream, output, null);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3Event() throws Exception {
|
||||
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
|
||||
System.setProperty("spring.cloud.function.definition", "inputS3Event");
|
||||
FunctionInvoker invoker = new FunctionInvoker();
|
||||
|
||||
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
invoker.handleRequest(targetStream, output, null);
|
||||
|
||||
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertThat(result).contains("s3SchemaVersion");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3EventAsMessage() throws Exception {
|
||||
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
|
||||
System.setProperty("spring.cloud.function.definition", "inputS3EventAsMessage");
|
||||
FunctionInvoker invoker = new FunctionInvoker();
|
||||
|
||||
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
invoker.handleRequest(targetStream, output, null);
|
||||
|
||||
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertThat(result).contains("s3SchemaVersion");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testS3EventAsMap() throws Exception {
|
||||
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
|
||||
System.setProperty("spring.cloud.function.definition", "inputS3EventAsMap");
|
||||
FunctionInvoker invoker = new FunctionInvoker();
|
||||
|
||||
InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
invoker.handleRequest(targetStream, output, null);
|
||||
|
||||
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
|
||||
assertThat(result).contains("s3SchemaVersion");
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public void testApiGatewayStringEventBody() throws Exception {
|
||||
@@ -538,6 +633,39 @@ public class FunctionInvokerTests {
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class S3Configuration {
|
||||
@Bean
|
||||
public Function<String, String> echoString() {
|
||||
return v -> v;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<S3Event, String> inputS3Event() {
|
||||
return v -> {
|
||||
System.out.println("Received: " + v);
|
||||
return v.toJson();
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Message<S3Event>, String> inputS3EventAsMessage() {
|
||||
return v -> {
|
||||
System.out.println("Received: " + v);
|
||||
return v.getPayload().toJson();
|
||||
};
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Map<String, Object>, String> inputS3EventAsMap() {
|
||||
return v -> {
|
||||
System.out.println("Received: " + v);
|
||||
return v.toString();
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
@Configuration
|
||||
public static class ApiGatewayConfiguration {
|
||||
|
||||
Reference in New Issue
Block a user