GH-1052 Fix collection/array processing for AWS invocations with Publisher input type functions

Resolves #1052
This commit is contained in:
Oleg Zhurakousky
2023-07-13 15:20:22 +02:00
parent 351e73b7f4
commit aff82e1457
7 changed files with 293 additions and 50 deletions

View File

@@ -74,14 +74,7 @@ public final class AWSLambdaUtils {
if (FunctionTypeUtils.isMessage(inputType) || FunctionTypeUtils.isPublisher(inputType)) {
inputType = FunctionTypeUtils.getImmediateGenericType(inputType, 0);
}
String typeName = inputType.getTypeName();
return typeName.equals("com.amazonaws.services.lambda.runtime.events.APIGatewayV2HTTPEvent")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.S3Event")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.SNSEvent")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.SQSEvent")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.APIGatewayCustomAuthorizerEvent")
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.KinesisEvent");
return FunctionTypeUtils.getRawType(inputType).getPackage().getName().startsWith("com.amazonaws.services.lambda.runtime.events");
}
@SuppressWarnings("rawtypes")
@@ -121,6 +114,9 @@ public final class AWSLambdaUtils {
MessageBuilder<byte[]> builder = MessageBuilder.withPayload(payload);
if (isApiGateway) {
builder.setHeader(AWSLambdaUtils.AWS_API_GATEWAY, true);
if (JsonMapper.isJsonStringRepresentsCollection(((Map) structMessage).get("body"))) {
builder.setHeader("payload", ((Map) structMessage).get("body"));
}
}
if (!isSupplier && AWSLambdaUtils.isSupportedAWSType(inputType)) {
builder.setHeader(AWSLambdaUtils.AWS_EVENT, true);
@@ -145,35 +141,50 @@ public final class AWSLambdaUtils {
}
}
private static Object convertFromJsonIfNecessary(Object value, JsonMapper objectMapper) {
if (JsonMapper.isJsonString(value)) {
return objectMapper.fromJson(value, Object.class);
}
return value;
}
@SuppressWarnings("unchecked")
public static byte[] generateOutputFromObject(Message<?> requestMessage, Object output, JsonMapper objectMapper, Type functionOutputType) {
Message<byte[]> responseMessage = null;
if (output instanceof Publisher<?>) {
List<Object> result = new ArrayList<>();
for (Object value : Flux.from((Publisher<?>) output).toIterable()) {
Message<?> lastMessage = null;
for (Object item : Flux.from((Publisher<?>) output).toIterable()) {
if (logger.isDebugEnabled()) {
logger.debug("Response value: " + value);
logger.debug("Response value: " + item);
}
if (item instanceof Message<?> message) {
result.add(convertFromJsonIfNecessary(message.getPayload(), objectMapper));
lastMessage = message;
}
else {
result.add(convertFromJsonIfNecessary(item, objectMapper));
}
result.add(value);
}
if (result.size() > 1) {
output = result;
byte[] resultPayload;
if (result.size() == 1) {
resultPayload = objectMapper.toJson(result.get(0));
}
else if (result.size() == 1) {
output = result.get(0);
else if (result.size() > 1) {
resultPayload = objectMapper.toJson(result);
}
else {
output = null;
resultPayload = null;
}
if (output instanceof Message<?> && ((Message<?>) output).getPayload() instanceof byte[]) {
responseMessage = (Message<byte[]>) output;
}
else if (output != null) {
if (logger.isDebugEnabled()) {
logger.debug("OUTPUT: " + output + " - " + output.getClass().getName());
if (resultPayload != null) {
System.out.println(new String(resultPayload));
MessageBuilder<byte[]> messageBuilder = MessageBuilder.withPayload(resultPayload);
if (lastMessage != null) {
messageBuilder.copyHeaders(lastMessage.getHeaders());
}
byte[] payload = objectMapper.toJson(output);
responseMessage = MessageBuilder.withPayload(payload).build();
responseMessage = messageBuilder.build();
}
}
else {

View File

@@ -86,7 +86,13 @@ class AWSTypesMessageConverter extends JsonMessageConverter {
return structMessage;
}
else {
Object body = structMessage.get("body");
Object body;
if (message.getHeaders().containsKey("payload")) {
body = message.getPayload();
}
else {
body = structMessage.get("body");
}
Object convertedResult = this.jsonMapper.fromJson(body, targetClass);
return convertedResult;
}

View File

@@ -16,6 +16,7 @@
package org.springframework.cloud.function.adapter.aws;
import java.io.ByteArrayInputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.SocketException;
@@ -44,11 +45,8 @@ import org.springframework.http.RequestEntity;
import org.springframework.http.ResponseEntity;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import static org.apache.http.HttpHeaders.USER_AGENT;
/**
@@ -107,7 +105,6 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle {
return this.running;
}
@SuppressWarnings("unchecked")
private void eventLoop(ConfigurableApplicationContext context) {
Environment environment = context.getEnvironment();
logger.info("Starting spring-cloud-function CustomRuntimeEventLoop");
@@ -140,33 +137,18 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle {
try {
FunctionInvocationWrapper function = locateFunction(environment, functionCatalog, response.getHeaders());
Message<byte[]> eventMessage = AWSLambdaUtils
.generateMessage(response.getBody().getBytes(StandardCharsets.UTF_8), function.getInputType(), function.isSupplier(), mapper);
ByteArrayInputStream is = new ByteArrayInputStream(response.getBody().getBytes(StandardCharsets.UTF_8));
Message<?> requestMessage = AWSLambdaUtils.generateMessage(is, function.getInputType(), function.isSupplier(), mapper, null);
if (logger.isDebugEnabled()) {
logger.debug("Event message: " + eventMessage);
}
Object functionResponse = function.apply(requestMessage);
byte[] responseBytes = AWSLambdaUtils.generateOutputFromObject(requestMessage, functionResponse, mapper, function.getOutputType());
String invocationUrl = MessageFormat
.format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
String traceId = response.getHeaders().getFirst("Lambda-Runtime-Trace-Id");
if (StringUtils.hasText(traceId)) {
if (logger.isDebugEnabled()) {
logger.debug("Lambda-Runtime-Trace-Id: " + traceId);
}
System.setProperty("com.amazonaws.xray.traceHeader", traceId);
}
Object responseObject = function.apply(eventMessage);
if (responseObject != null && logger.isDebugEnabled()) {
logger.debug("Reply from function: " + responseObject);
}
byte[] outputBody = AWSLambdaUtils.generateOutputFromObject(eventMessage, responseObject, mapper, function.getOutputType());
ResponseEntity<Object> result = rest.exchange(RequestEntity.post(URI.create(invocationUrl))
.header(USER_AGENT, USER_AGENT_VALUE)
.body(outputBody), Object.class);
.body(responseBytes), Object.class);
if (logger.isInfoEnabled()) {
logger.info("Result POST status: " + result);

View File

@@ -39,6 +39,90 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class CustomRuntimeEventLoopTest {
private String API_EVENT = "{\n"
+ " \"version\": \"1.0\",\n"
+ " \"resource\": \"$default\",\n"
+ " \"path\": \"/question\",\n"
+ " \"httpMethod\": \"POST\",\n"
+ " \"headers\": {\n"
+ " \"Content-Length\": \"40\",\n"
+ " \"Content-Type\": \"application/json\",\n"
+ " \"Host\": \"emcdxu5ijj.execute-api.us-east-2.amazonaws.com\",\n"
+ " \"User-Agent\": \"curl/7.88.1\",\n"
+ " \"X-Amzn-Trace-Id\": \"Root=1-64ad9787-4c89d5af7607eb9e522e01d5\",\n"
+ " \"X-Forwarded-For\": \"109.210.252.44\",\n"
+ " \"X-Forwarded-Port\": \"443\",\n"
+ " \"X-Forwarded-Proto\": \"https\",\n"
+ " \"accept\": \"*/*\"\n"
+ " },\n"
+ " \"multiValueHeaders\": {\n"
+ " \"Content-Length\": [\n"
+ " \"40\"\n"
+ " ],\n"
+ " \"Content-Type\": [\n"
+ " \"application/json\"\n"
+ " ],\n"
+ " \"Host\": [\n"
+ " \"emcdxu5ijj.execute-api.us-east-2.amazonaws.com\"\n"
+ " ],\n"
+ " \"User-Agent\": [\n"
+ " \"curl/7.88.1\"\n"
+ " ],\n"
+ " \"X-Amzn-Trace-Id\": [\n"
+ " \"Root=1-64ad9787-4c89d5af7607eb9e522e01d5\"\n"
+ " ],\n"
+ " \"X-Forwarded-For\": [\n"
+ " \"109.210.252.44\"\n"
+ " ],\n"
+ " \"X-Forwarded-Port\": [\n"
+ " \"443\"\n"
+ " ],\n"
+ " \"X-Forwarded-Proto\": [\n"
+ " \"https\"\n"
+ " ],\n"
+ " \"accept\": [\n"
+ " \"*/*\"\n"
+ " ]\n"
+ " },\n"
+ " \"queryStringParameters\": null,\n"
+ " \"multiValueQueryStringParameters\": null,\n"
+ " \"requestContext\": {\n"
+ " \"accountId\": \"313369169943\",\n"
+ " \"apiId\": \"emcdxu5ijj\",\n"
+ " \"domainName\": \"emcdxu5ijj.execute-api.us-east-2.amazonaws.com\",\n"
+ " \"domainPrefix\": \"emcdxu5ijj\",\n"
+ " \"extendedRequestId\": \"H6SdPgXtiYcEP1w=\",\n"
+ " \"httpMethod\": \"POST\",\n"
+ " \"identity\": {\n"
+ " \"accessKey\": null,\n"
+ " \"accountId\": null,\n"
+ " \"caller\": null,\n"
+ " \"cognitoAmr\": null,\n"
+ " \"cognitoAuthenticationProvider\": null,\n"
+ " \"cognitoAuthenticationType\": null,\n"
+ " \"cognitoIdentityId\": null,\n"
+ " \"cognitoIdentityPoolId\": null,\n"
+ " \"principalOrgId\": null,\n"
+ " \"sourceIp\": \"109.210.252.44\",\n"
+ " \"user\": null,\n"
+ " \"userAgent\": \"curl/7.88.1\",\n"
+ " \"userArn\": null\n"
+ " },\n"
+ " \"path\": \"/question\",\n"
+ " \"protocol\": \"HTTP/1.1\",\n"
+ " \"requestId\": \"H6SdPgXtiYcEP1w=\",\n"
+ " \"requestTime\": \"11/Jul/2023:17:55:19 +0000\",\n"
+ " \"requestTimeEpoch\": 1689098119662,\n"
+ " \"resourceId\": \"$default\",\n"
+ " \"resourcePath\": \"$default\",\n"
+ " \"stage\": \"$default\"\n"
+ " },\n"
+ " \"pathParameters\": null,\n"
+ " \"stageVariables\": null,\n"
+ " \"body\": \"[{\\\"latitude\\\": 41.34, \\\"longitude\\\": 2.78},{\\\"latitude\\\": 43.24, \\\"longitude\\\": 3.78}]\",\n"
+ " \"isBase64Encoded\": false\n"
+ "}";
@Test
public void testDefaultFunctionLookup() throws Exception {
testDefaultFunctionLookup("uppercase", SingleFunctionConfiguration.class);
@@ -98,6 +182,21 @@ public class CustomRuntimeEventLoopTest {
}
}
@Test
public void test_HANDLERWithApiGatewayRequestAndFlux() throws Exception {
try (ConfigurableApplicationContext userContext =
new SpringApplicationBuilder(MultipleFunctionConfiguration.class, AWSCustomRuntime.class)
.web(WebApplicationType.SERVLET)
.properties("_HANDLER=echoFlux", "server.port=0")
.run()) {
AWSCustomRuntime aws = userContext.getBean(AWSCustomRuntime.class);
String response = aws.exchange(API_EVENT).getPayload();
assertThat(response).contains("{\\\"latitude\\\":2.78,\\\"longitude\\\":41.34}");
assertThat(response).contains("{\\\"latitude\\\":3.78,\\\"longitude\\\":43.24}");
}
}
@Test
@DirtiesContext
public void test_definitionLookupAndComposition() throws Exception {
@@ -149,6 +248,13 @@ public class CustomRuntimeEventLoopTest {
public Function<Person, Person> uppercasePerson() {
return p -> new Person(p.getName().toUpperCase());
}
@Bean
public Function<Flux<GeoLocation>, Flux<GeoLocation>> echoFlux() {
return flux -> flux.map(g -> {
return new GeoLocation(g.longitude(), g.latitude());
});
}
}
@EnableAutoConfiguration
@@ -185,4 +291,7 @@ public class CustomRuntimeEventLoopTest {
}
}
public record GeoLocation(Float latitude, Float longitude) {
}
}

View File

@@ -74,6 +74,8 @@ public class FunctionInvokerTests {
String jsonCollection = "[\"Ricky\",\"Julien\",\"Bubbles\"]";
String jsonPojoCollection = "[{\"name\":\"Ricky\"},{\"name\":\"Julien\"},{\"name\":\"Julien\"}]";
String dynamoDbEvent = "{\n"
+ " \"Records\": [\n"
+ " {\n"
@@ -610,6 +612,84 @@ public class FunctionInvokerTests {
" \"isBase64Encoded\": false\n" +
"}";
String apiGatewayEventWithArray = "{\n" +
" \"resource\": \"/uppercase2\",\n" +
" \"path\": \"/uppercase2\",\n" +
" \"httpMethod\": \"POST\",\n" +
" \"headers\": {\n" +
" \"accept\": \"*/*\",\n" +
" \"content-type\": \"application/json\",\n" +
" \"Host\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" +
" \"User-Agent\": \"curl/7.54.0\",\n" +
" \"X-Amzn-Trace-Id\": \"Root=1-5ece339e-e0595766066d703ec70f1522\",\n" +
" \"X-Forwarded-For\": \"90.37.8.133\",\n" +
" \"X-Forwarded-Port\": \"443\",\n" +
" \"X-Forwarded-Proto\": \"https\"\n" +
" },\n" +
" \"multiValueHeaders\": {\n" +
" \"accept\": [\n" +
" \"*/*\"\n" +
" ],\n" +
" \"content-type\": [\n" +
" \"application/json\"\n" +
" ],\n" +
" \"Host\": [\n" +
" \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\"\n" +
" ],\n" +
" \"User-Agent\": [\n" +
" \"curl/7.54.0\"\n" +
" ],\n" +
" \"X-Amzn-Trace-Id\": [\n" +
" \"Root=1-5ece339e-e0595766066d703ec70f1522\"\n" +
" ],\n" +
" \"X-Forwarded-For\": [\n" +
" \"90.37.8.133\"\n" +
" ],\n" +
" \"X-Forwarded-Port\": [\n" +
" \"443\"\n" +
" ],\n" +
" \"X-Forwarded-Proto\": [\n" +
" \"https\"\n" +
" ]\n" +
" },\n" +
" \"queryStringParameters\": null,\n" +
" \"multiValueQueryStringParameters\": null,\n" +
" \"pathParameters\": null,\n" +
" \"stageVariables\": null,\n" +
" \"requestContext\": {\n" +
" \"resourceId\": \"qf0io6\",\n" +
" \"resourcePath\": \"/uppercase2\",\n" +
" \"httpMethod\": \"POST\",\n" +
" \"extendedRequestId\": \"NL0A1EokCGYFZOA=\",\n" +
" \"requestTime\": \"27/May/2020:09:32:14 +0000\",\n" +
" \"path\": \"/test/uppercase2\",\n" +
" \"accountId\": \"123456789098\",\n" +
" \"protocol\": \"HTTP/1.1\",\n" +
" \"stage\": \"test\",\n" +
" \"domainPrefix\": \"fhul32ccy2\",\n" +
" \"requestTimeEpoch\": 1590571934872,\n" +
" \"requestId\": \"b96500aa-f92a-43c3-9360-868ba4053a00\",\n" +
" \"identity\": {\n" +
" \"cognitoIdentityPoolId\": null,\n" +
" \"accountId\": null,\n" +
" \"cognitoIdentityId\": null,\n" +
" \"caller\": null,\n" +
" \"sourceIp\": \"90.37.8.133\",\n" +
" \"principalOrgId\": null,\n" +
" \"accessKey\": null,\n" +
" \"cognitoAuthenticationType\": null,\n" +
" \"cognitoAuthenticationProvider\": null,\n" +
" \"userArn\": null,\n" +
" \"userAgent\": \"curl/7.54.0\",\n" +
" \"user\": null\n" +
" },\n" +
" \"domainName\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" +
" \"apiId\": \"fhul32ccy2\"\n" +
" },\n" +
" \"body\":[{\"name\":\"Jim Lahey\"},{\"name\":\"Ricky\"}],\n" +
" \"isBase64Encoded\": false\n" +
"}";
String gwAuthorizerEvent = "{\n"
+ " \"type\":\"TOKEN\",\n"
+ " \"authorizationToken\":\"allow\",\n"
@@ -650,6 +730,19 @@ public class FunctionInvokerTests {
assertThat(result).isEqualTo(this.jsonCollection);
}
@Test
public void testCollectionPojo() throws Exception {
System.setProperty("MAIN_CLASS", SampleConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "echoPojoReactive");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.jsonPojoCollection.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).isEqualTo(this.jsonPojoCollection);
}
@Test
public void testKinesisStringEvent() throws Exception {
System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName());
@@ -1019,6 +1112,23 @@ public class FunctionInvokerTests {
assertThat(person.getName()).isEqualTo("JIM LAHEY");
}
@SuppressWarnings("rawtypes")
@Test
public void testApiGatewayPojoReturninPojoReactive() throws Exception {
System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "uppercasePojoReturnPojoReactive");
FunctionInvoker invoker = new FunctionInvoker();
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEventWithArray.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
Map response = mapper.readValue(output.toByteArray(), Map.class);
System.out.println(response);
// Person person = mapper.readValue((String) response.get("body"), Person.class);
// assertThat(person.getName()).isEqualTo("JIM LAHEY");
}
@SuppressWarnings("rawtypes")
@Test
public void testApiGatewayPojoEventBody() throws Exception {
@@ -1339,6 +1449,11 @@ public class FunctionInvokerTests {
public Function<Flux<String>, Flux<String>> echoStringReactive() {
return v -> v;
}
@Bean
public Function<Flux<Person>, Flux<Person>> echoPojoReactive() {
return v -> v;
}
}
@EnableAutoConfiguration
@@ -1602,6 +1717,15 @@ public class FunctionInvokerTests {
};
}
@Bean
public Function<Flux<Person>, Flux<Person>> uppercasePojoReturnPojoReactive() {
return flux -> flux.map(v -> {
Person p = new Person();
p.setName(v.getName().toUpperCase());
return p;
});
}
@Bean
public Function<APIGatewayProxyRequestEvent, String> inputApiEvent() {
return v -> {