From e8846443c4ed4bf20e1ed4b205036bfb1f3370d1 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 22 Jun 2020 08:21:04 +0200 Subject: [PATCH] GH-551 Fix support for AWS SQSEvent Resolves #551 --- .../function/adapter/aws/FunctionInvoker.java | 26 ++++- .../adapter/aws/FunctionInvokerTests.java | 110 ++++++++++++++++++ 2 files changed, 130 insertions(+), 6 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index 470dcb248..2ecf94a4d 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2019 the original author or authors. + * Copyright 2019-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,12 +25,14 @@ import java.nio.charset.StandardCharsets; import java.util.Calendar; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; @@ -174,11 +176,23 @@ public class FunctionInvoker implements RequestStreamHandler { if (request instanceof Map) { Map requestMap = (Map) request; if (requestMap.containsKey("Records")) { - logger.info("Incoming request is Kinesis Event"); - Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, - "Only KinesisEvent or Map type is supported as input type for functions that accept with Kinesis Event"); - Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); - messageBuilder = MessageBuilder.withPayload(event); + List> records = (List>) requestMap.get("Records"); + Assert.notEmpty(records, "Incoming event has no records: " + requestMap); + boolean kinesisEvent = records.get(0).containsKey("kinesis"); + if (kinesisEvent) { + logger.info("Incoming request is Kinesis Event"); + Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, + "Only KinesisEvent or Map type is supported as input type for functions that accept Kinesis Event"); + Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); + messageBuilder = MessageBuilder.withPayload(event); + } + else { + logger.info("Incoming request is SQS Event"); + Assert.isTrue(inputType instanceof Class && SQSEvent.class.isAssignableFrom((Class) inputType) || mapInputType, + "Only SQSEvent or Map type is supported as input type for functions that accept SQS Event"); + Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, SQSEvent.class); + messageBuilder = MessageBuilder.withPayload(event); + } } else if (requestMap.containsKey("httpMethod")) { // API Gateway logger.info("Incoming request is API Gateway"); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index 81ada8486..cd06c8418 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -25,6 +25,7 @@ import java.util.function.Function; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -69,6 +70,27 @@ public class FunctionInvokerTests { " \"body\": \"request_body\"" + "}"; + String sampleSQSEvent = "{\n" + + " \"Records\": [\n" + + " {\n" + + " \"messageId\": \"19dd0b57-b21e-4ac1-bd88-01bbb068cb78\",\n" + + " \"receiptHandle\": \"MessageReceiptHandle\",\n" + + " \"body\": \"Hello from SQS!\",\n" + + " \"attributes\": {\n" + + " \"ApproximateReceiveCount\": \"1\",\n" + + " \"SentTimestamp\": \"1523232000000\",\n" + + " \"SenderId\": \"123456789012\",\n" + + " \"ApproximateFirstReceiveTimestamp\": \"1523232000001\"\n" + + " },\n" + + " \"messageAttributes\": {},\n" + + " \"md5OfBody\": \"7b270e59b47ff90a553787216d55d91d\",\n" + + " \"eventSource\": \"aws:sqs\",\n" + + " \"eventSourceARN\": \"arn:aws:sqs:eu-central-1:123456789012:MyQueue\",\n" + + " \"awsRegion\": \"eu-central-1\"\n" + + " }\n" + + " ]\n" + + "}"; + String sampleKinesisEvent = "{" + " \"Records\": [" + " {" + @@ -317,6 +339,61 @@ public class FunctionInvokerTests { assertThat(result).contains("49590338271490256608559692538361571095921575989136588898"); } + @Test + public void testSQSStringEvent() throws Exception { + Assertions.assertThrows(IllegalArgumentException.class, () -> { + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + }); + } + + @Test + public void testSQSEvent() throws Exception { + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSQSEvent"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); + } + + @Test + public void testSQSEventAsMessage() throws Exception { + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSQSEventAsMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); + } + + @Test + public void testSQSEventAsMap() throws Exception { + System.setProperty("MAIN_CLASS", SQSConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputSQSEventAsMap"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleSQSEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("arn:aws:sqs:eu-central-1:123456789012:MyQueue"); + } + @SuppressWarnings("rawtypes") @Test public void testApiGatewayStringEventBody() throws Exception { @@ -428,6 +505,39 @@ public class FunctionInvokerTests { } } + @EnableAutoConfiguration + @Configuration + public static class SQSConfiguration { + @Bean + public Function echoString() { + return v -> v; + } + + @Bean + public Function inputSQSEvent() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + + @Bean + public Function, String> inputSQSEventAsMessage() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + + @Bean + public Function, String> inputSQSEventAsMap() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + } + @EnableAutoConfiguration @Configuration public static class ApiGatewayConfiguration {