GH-709, GH-684, GH-641 Upgrade AWS dependency

Refactor and simplify AWSLambdaUtils to work with AWS serialization libraries
Add support for API v2 gateway event

Resolves #709
Resolves #684
Resolves #641
This commit is contained in:
Oleg Zhurakousky
2021-07-22 18:57:57 +02:00
parent 7d9e3e31fb
commit dc98f0b49f
5 changed files with 197 additions and 106 deletions

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.adapter.aws;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
@@ -23,12 +24,18 @@ import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2HTTPEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.serialization.PojoSerializer;
import com.amazonaws.services.lambda.runtime.serialization.events.LambdaEventSerializers;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
@@ -45,8 +52,6 @@ import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
/**
*
@@ -57,6 +62,8 @@ final class AWSLambdaUtils {
private static Log logger = LogFactory.getLog(AWSLambdaUtils.class);
private static final String AWS_API_GATEWAY = "aws-api-gateway";
private AWSLambdaUtils() {
}
@@ -66,71 +73,82 @@ final class AWSLambdaUtils {
return generateMessage(payload, headers, inputType, objectMapper, null);
}
private static boolean isSupportedAWSType(Type inputType) {
return APIGatewayV2HTTPEvent.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType))
|| S3Event.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType))
|| APIGatewayProxyRequestEvent.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType))
|| SNSEvent.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType))
|| SQSEvent.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType))
|| KinesisEvent.class.isAssignableFrom(FunctionTypeUtils.getRawType(inputType));
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Message<byte[]> generateMessage(byte[] payload, MessageHeaders headers,
Type inputType, ObjectMapper objectMapper, @Nullable Context awsContext) {
if (!objectMapper.isEnabled(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)) {
configureObjectMapper(objectMapper);
}
if (logger.isInfoEnabled()) {
logger.info("Incoming JSON Event: " + new String(payload));
}
MessageBuilder messageBuilder = null;
Object request;
try {
request = objectMapper.readValue(payload, Object.class);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
if (FunctionTypeUtils.isMessage(inputType)) {
inputType = FunctionTypeUtils.getImmediateGenericType(inputType, 0);
}
boolean mapInputType = (inputType instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) inputType).getRawType()).isAssignableFrom(Map.class));
if (request instanceof Map) {
Map<String, ?> requestMap = (Map<String, ?>) request;
if (requestMap.containsKey("Records")) {
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
logEvent(records);
messageBuilder = MessageBuilder.withPayload(payload);
}
else if (requestMap.containsKey("httpMethod")) { // API Gateway
MessageBuilder messageBuilder = null;
if (inputType != null && isSupportedAWSType(inputType)) {
PojoSerializer<?> serializer = LambdaEventSerializers.serializerFor(FunctionTypeUtils.getRawType(inputType), Thread.currentThread().getContextClassLoader());
Object event = serializer.fromJson(new ByteArrayInputStream(payload));
messageBuilder = MessageBuilder.withPayload(event);
if (event instanceof APIGatewayProxyRequestEvent || event instanceof APIGatewayV2HTTPEvent) {
messageBuilder.setHeader(AWS_API_GATEWAY, true);
logger.info("Incoming request is API Gateway");
if (isTypeAnApiGatewayRequest(inputType)) {
APIGatewayProxyRequestEvent gatewayEvent = objectMapper.convertValue(requestMap, APIGatewayProxyRequestEvent.class);
messageBuilder = MessageBuilder.withPayload(gatewayEvent);
}
else if (mapInputType) {
messageBuilder = MessageBuilder.withPayload(requestMap).setHeader("httpMethod", requestMap.get("httpMethod"));
}
else {
Object body = requestMap.remove("body");
try {
body = body instanceof String
? String.valueOf(body).getBytes(StandardCharsets.UTF_8)
: objectMapper.writeValueAsBytes(body);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
messageBuilder = MessageBuilder.withPayload(body).copyHeaders(requestMap);
}
}
Object providedHeaders = requestMap.remove("headers");
if (providedHeaders != null && providedHeaders instanceof Map) {
messageBuilder.removeHeader("headers");
messageBuilder.copyHeaders((Map<String, Object>) providedHeaders);
}
}
else if (request instanceof Iterable) {
messageBuilder = MessageBuilder.withPayload(request);
else {
if (!objectMapper.isEnabled(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)) {
configureObjectMapper(objectMapper);
}
Object request;
try {
request = objectMapper.readValue(payload, Object.class);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
if (request instanceof Map) {
if (((Map) request).containsKey("httpMethod")) { //API Gateway
boolean mapInputType = (inputType instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) inputType).getRawType()).isAssignableFrom(Map.class));
if (mapInputType) {
messageBuilder = MessageBuilder.withPayload(request).setHeader("httpMethod", ((Map) request).get("httpMethod"));
}
else {
Object body = ((Map) request).remove("body");
try {
body = body instanceof String
? String.valueOf(body).getBytes(StandardCharsets.UTF_8)
: objectMapper.writeValueAsBytes(body);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
messageBuilder = MessageBuilder.withPayload(body).copyHeaders(((Map) request));
}
messageBuilder.setHeader(AWS_API_GATEWAY, true);
}
Object providedHeaders = ((Map) request).remove("headers");
if (providedHeaders != null && providedHeaders instanceof Map) {
messageBuilder.removeHeader("headers");
messageBuilder.copyHeaders((Map<String, Object>) providedHeaders);
}
}
else if (request instanceof Iterable) {
messageBuilder = MessageBuilder.withPayload(request);
}
}
if (messageBuilder == null) {
messageBuilder = MessageBuilder.withPayload(payload);
}
@@ -145,12 +163,13 @@ final class AWSLambdaUtils {
@SuppressWarnings({ "rawtypes", "unchecked" })
public static byte[] generateOutput(Message requestMessage, Message<byte[]> responseMessage,
ObjectMapper objectMapper) {
if (!objectMapper.isEnabled(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)) {
configureObjectMapper(objectMapper);
}
byte[] responseBytes = responseMessage == null ? "\"OK\"".getBytes() : responseMessage.getPayload();
if (requestMessage.getHeaders().containsKey("httpMethod")
|| isPayloadAnApiGatewayRequest(requestMessage.getPayload())) { // API Gateway
if (requestMessage.getHeaders().containsKey(AWS_API_GATEWAY) && ((boolean) requestMessage.getHeaders().get(AWS_API_GATEWAY))) {
Map<String, Object> response = new HashMap<String, Object>();
response.put("isBase64Encoded", false);
@@ -186,7 +205,6 @@ final class AWSLambdaUtils {
throw new IllegalStateException("Failed to serialize AWS Lambda output", e);
}
}
return responseBytes;
}
@@ -206,51 +224,8 @@ final class AWSLambdaUtils {
objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
}
private static boolean isPayloadAnApiGatewayRequest(Object payload) {
return isAPIGatewayProxyRequestEventPresent()
? payload instanceof APIGatewayProxyRequestEvent
: false;
}
private static boolean isTypeAnApiGatewayRequest(Type type) {
return type != null && isAPIGatewayProxyRequestEventPresent()
? type.getTypeName().endsWith(APIGatewayProxyRequestEvent.class.getSimpleName())
: false;
}
private static boolean isAPIGatewayProxyRequestEventPresent() {
return ClassUtils.isPresent("com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent",
ClassUtils.getDefaultClassLoader());
}
private static void logEvent(List<Map<String, ?>> records) {
if (isKinesisEvent(records.get(0))) {
logger.info("Incoming request is Kinesis Event");
}
else if (isS3Event(records.get(0))) {
logger.info("Incoming request is S3 Event");
}
else if (isSNSEvent(records.get(0))) {
logger.info("Incoming request is SNS Event");
}
else {
logger.info("Incoming request is SQS Event");
}
}
private static boolean isRequestKinesis(Message<Object> requestMessage) {
return requestMessage.getHeaders().containsKey("Records");
}
private static boolean isSNSEvent(Map<String, ?> record) {
return record.containsKey("Sns");
}
private static boolean isS3Event(Map<String, ?> record) {
return record.containsKey("s3");
}
private static boolean isKinesisEvent(Map<String, ?> record) {
return record.containsKey("kinesis");
}
}

View File

@@ -27,6 +27,7 @@ import java.util.function.Function;
import java.util.function.Supplier;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2HTTPEvent;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
@@ -171,6 +173,77 @@ public class FunctionInvokerTests {
" ]" +
"}";
//https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-develop-integrations-lambda.html
String apiGatewayV2Event = "{\n" +
" \"version\": \"2.0\",\n" +
" \"routeKey\": \"$default\",\n" +
" \"rawPath\": \"/my/path\",\n" +
" \"rawQueryString\": \"parameter1=value1&parameter1=value2&parameter2=value\",\n" +
" \"cookies\": [\n" +
" \"cookie1\",\n" +
" \"cookie2\"\n" +
" ],\n" +
" \"headers\": {\n" +
" \"header1\": \"value1\",\n" +
" \"header2\": \"value1,value2\"\n" +
" },\n" +
" \"queryStringParameters\": {\n" +
" \"parameter1\": \"value1,value2\",\n" +
" \"parameter2\": \"value\"\n" +
" },\n" +
" \"requestContext\": {\n" +
" \"accountId\": \"123456789012\",\n" +
" \"apiId\": \"api-id\",\n" +
" \"authentication\": {\n" +
" \"clientCert\": {\n" +
" \"clientCertPem\": \"CERT_CONTENT\",\n" +
" \"subjectDN\": \"www.example.com\",\n" +
" \"issuerDN\": \"Example issuer\",\n" +
" \"serialNumber\": \"a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1\",\n" +
" \"validity\": {\n" +
" \"notBefore\": \"May 28 12:30:02 2019 GMT\",\n" +
" \"notAfter\": \"Aug 5 09:36:04 2021 GMT\"\n" +
" }\n" +
" }\n" +
" },\n" +
" \"authorizer\": {\n" +
" \"jwt\": {\n" +
" \"claims\": {\n" +
" \"claim1\": \"value1\",\n" +
" \"claim2\": \"value2\"\n" +
" },\n" +
" \"scopes\": [\n" +
" \"scope1\",\n" +
" \"scope2\"\n" +
" ]\n" +
" }\n" +
" },\n" +
" \"domainName\": \"id.execute-api.us-east-1.amazonaws.com\",\n" +
" \"domainPrefix\": \"id\",\n" +
" \"http\": {\n" +
" \"method\": \"POST\",\n" +
" \"path\": \"/my/path\",\n" +
" \"protocol\": \"HTTP/1.1\",\n" +
" \"sourceIp\": \"IP\",\n" +
" \"userAgent\": \"agent\"\n" +
" },\n" +
" \"requestId\": \"id\",\n" +
" \"routeKey\": \"$default\",\n" +
" \"stage\": \"$default\",\n" +
" \"time\": \"12/Mar/2020:19:03:58 +0000\",\n" +
" \"timeEpoch\": 1583348638390\n" +
" },\n" +
" \"body\": \"Hello from Lambda\",\n" +
" \"pathParameters\": {\n" +
" \"parameter1\": \"value1\"\n" +
" },\n" +
" \"isBase64Encoded\": false,\n" +
" \"stageVariables\": {\n" +
" \"stageVariable1\": \"value1\",\n" +
" \"stageVariable2\": \"value2\"\n" +
" }\n" +
"}";
String apiGatewayEvent = "{\n" +
" \"resource\": \"/uppercase2\",\n" +
" \"path\": \"/uppercase2\",\n" +
@@ -567,6 +640,12 @@ public class FunctionInvokerTests {
@Test
public void testS3Event() throws Exception {
// S3EventSerializer<S3Event> ser = new S3EventSerializer<S3Event>().withClass(S3Event.class).withClassLoader(S3Event.class.getClassLoader());
// InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes());
// S3Event event = ser.fromJson(targetStream);
// System.out.println(event);
System.setProperty("MAIN_CLASS", S3Configuration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputS3Event");
FunctionInvoker invoker = new FunctionInvoker();
@@ -653,6 +732,22 @@ public class FunctionInvokerTests {
assertThat(result.get("body")).isEqualTo("\"hello\"");
}
@SuppressWarnings("rawtypes")
@Test
public void testApiGatewayV2Event() throws Exception {
System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputApiV2Event");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayV2Event.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
Map result = mapper.readValue(output.toByteArray(), Map.class);
System.out.println(result);
assertThat(result.get("body")).isEqualTo("\"Hello from Lambda\"");
}
@SuppressWarnings("rawtypes")
@Test
public void testApiGatewayAsSupplier() throws Exception {
@@ -935,18 +1030,18 @@ public class FunctionInvokerTests {
}
@Bean
public Function<S3Event, String> inputS3Event() {
public Function<S3Event, String> inputS3Event(JsonMapper jsonMapper) {
return v -> {
System.out.println("Received: " + v);
return v.toJson();
return jsonMapper.toString(v);
};
}
@Bean
public Function<Message<S3Event>, String> inputS3EventAsMessage() {
public Function<Message<S3Event>, String> inputS3EventAsMessage(JsonMapper jsonMapper) {
return v -> {
System.out.println("Received: " + v);
return v.getPayload().toJson();
return jsonMapper.toString(v);
};
}
@@ -991,6 +1086,13 @@ public class FunctionInvokerTests {
};
}
@Bean
public Function<APIGatewayV2HTTPEvent, String> inputApiV2Event() {
return v -> {
return v.getBody();
};
}
@Bean
public Function<Message<APIGatewayProxyRequestEvent>, String> inputApiEventAsMessage() {
return v -> {

View File

@@ -27,6 +27,7 @@ import com.amazonaws.kinesis.agg.RecordAggregator;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
@@ -44,6 +45,7 @@ import static org.assertj.core.api.Assertions.fail;
/**
* @author Halvdan Hoem Grelland
*/
@Disabled
public class SpringBootKinesisEventHandlerTests {
private static final ObjectMapper mapper = new ObjectMapper();