From 528419cfd32e0309c150e5d885bd1477704087e6 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 27 May 2020 13:49:55 +0200 Subject: [PATCH] Fix AWS FunctionInvoker to ensure it properly interprets translation of incoming APIGateway request to user FunctionInvoker This assumes that - 'body' will be extracted for cases such as POJO or String input - Map input will simply represent the incoming request - APIGatewayProxyRequestEvent input represents native representation of API Gateway request Similar fixes went for Kinesis side of things Added additional tests for both Kinesis and ApiGateway --- .../function/adapter/aws/FunctionInvoker.java | 107 +++-- .../adapter/aws/FunctionInvokerTests.java | 404 +++++++++++++++--- 2 files changed, 420 insertions(+), 91 deletions(-) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index 88b59826e..f2273bd32 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -19,15 +19,18 @@ package org.springframework.cloud.function.adapter.aws; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.nio.charset.StandardCharsets; import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; -import java.util.function.Function; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.JsonDeserializer; @@ -40,11 +43,14 @@ import org.apache.commons.logging.LogFactory; import org.springframework.boot.SpringApplication; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.cloud.function.context.catalog.FunctionTypeUtils; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.utils.FunctionClassUtils; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.Environment; import org.springframework.http.HttpStatus; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.StreamUtils; @@ -64,42 +70,52 @@ public class FunctionInvoker implements RequestStreamHandler { private ObjectMapper mapper; - private Function, Message> function; + private FunctionInvocationWrapper function; public FunctionInvoker() { this.start(); } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { + Message requestMessage = this.generateMessage(input, context); - Message requestMessage = this.generateMessage(input, context); - - Message responseMessage = this.function.apply(requestMessage); + Message responseMessage = (Message) this.function.apply(requestMessage); byte[] responseBytes = responseMessage.getPayload(); - Map requestPayloadMap = this.getRequestPayloadAsMap(requestMessage); - if (requestPayloadMap != null && requestPayloadMap.containsKey("httpMethod")) { + if (requestMessage.getHeaders().containsKey("httpMethod") || requestMessage.getPayload() instanceof APIGatewayProxyRequestEvent) { // API Gateway Map response = new HashMap(); response.put("isBase64Encoded", false); - int statusCode = responseMessage.getHeaders().containsKey("statusCode") - ? (int) responseMessage.getHeaders().get("statusCode") + MessageHeaders headers = responseMessage.getHeaders(); + int statusCode = headers.containsKey("statusCode") + ? (int) headers.get("statusCode") : 200; - HttpStatus httpStatus = HttpStatus.valueOf(statusCode); - response.put("statusCode", statusCode); - response.put("statusDescription", httpStatus.toString()); - response.put("body", new String(responseMessage.getPayload(), StandardCharsets.UTF_8)); - response.put("headers", responseMessage.getHeaders()); + if (isKinesis(requestMessage)) { + HttpStatus httpStatus = HttpStatus.valueOf(statusCode); + response.put("statusDescription", httpStatus.toString()); + } + String body = new String(responseMessage.getPayload(), StandardCharsets.UTF_8).replaceAll("\"", ""); + response.put("body", body); + + Map responseHeaders = new HashMap<>(); + headers.keySet().forEach(key -> responseHeaders.put(key, headers.get(key).toString())); + + response.put("headers", responseHeaders); responseBytes = mapper.writeValueAsBytes(response); } StreamUtils.copy(responseBytes, output); } + private boolean isKinesis(Message requestMessage) { + return requestMessage.getHeaders().containsKey("Records"); + } + private void start() { ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass()); Environment environment = context.getEnvironment(); @@ -140,22 +156,61 @@ public class FunctionInvoker implements RequestStreamHandler { mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); } + @SuppressWarnings({ "unchecked", "rawtypes" }) private Message generateMessage(InputStream input, Context context) throws IOException { byte[] payload = StreamUtils.copyToByteArray(input); - Message message = MessageBuilder.withPayload(payload).setHeader("aws-context", context).build(); + if (logger.isInfoEnabled()) { + logger.info("===> Incoming JSON for ApiGateway Event: " + new String(payload)); + } + Message message = null; + Object request = this.mapper.readValue(payload, Object.class); + Type inputType = FunctionTypeUtils.getInputType(function.getFunctionType(), 0); + if (FunctionTypeUtils.isMessage(inputType)) { + inputType = FunctionTypeUtils.getImmediateGenericType(inputType, 0); + } + boolean mapInputType = (inputType instanceof ParameterizedType && ((Class) ((ParameterizedType) inputType).getRawType()).isAssignableFrom(Map.class)); + if (request instanceof Map) { + Map requestMap = (Map) request; + if (requestMap.containsKey("Records")) { + logger.info("Incoming request is Kinesis Event"); + Assert.isTrue(inputType instanceof Class && KinesisEvent.class.isAssignableFrom((Class) inputType) || mapInputType, + "Only KinesisEvent or Map type is supported as input type for functions that accept with Kinesis Event"); + Object event = mapInputType ? requestMap : this.mapper.convertValue(requestMap, KinesisEvent.class); + message = MessageBuilder.withPayload(event).setHeader("aws-context", context).build(); + } + else if (requestMap.containsKey("httpMethod")) { + logger.info("Incoming request is API Gateway"); + if (inputType.getTypeName().endsWith(APIGatewayProxyRequestEvent.class.getSimpleName())) { + APIGatewayProxyRequestEvent gatewayEvent = this.mapper.convertValue(requestMap, APIGatewayProxyRequestEvent.class); + message = MessageBuilder.withPayload(gatewayEvent).setHeader("aws-context", context).build(); + } + else if (mapInputType) { + message = MessageBuilder.withPayload(requestMap) + .setHeader("httpMethod", requestMap.get("httpMethod")) + .setHeader("aws-context", context) + .build(); + } + else { + Object body = requestMap.remove("body"); + if (body instanceof String) { + body = ("\"" + body + "\"").getBytes(StandardCharsets.UTF_8); + } + else { // assume array or map + body = mapper.writeValueAsBytes(body); + } + + message = MessageBuilder.withPayload(body) + .copyHeaders(requestMap) + .setHeader("aws-context", context) + .build(); + } + } + } + if (message == null) { + message = MessageBuilder.withPayload(payload).setHeader("aws-context", context).build(); + } return message; } - - @SuppressWarnings("unchecked") - private Map getRequestPayloadAsMap(Message message) { - try { - return this.mapper.readValue(message.getPayload(), Map.class); - } - catch (Exception e) { - // ignore - } - return null; - } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index 8bbf4070e..eb1e7877c 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2019 the original author or authors. + * Copyright 2012-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,10 @@ import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.function.Function; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -41,6 +43,8 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class FunctionInvokerTests { + ObjectMapper mapper = new ObjectMapper(); + String sampleLBEvent = "{" + " \"requestContext\": {" + " \"elb\": {" + @@ -102,59 +106,179 @@ public class FunctionInvokerTests { " ]" + "}"; - @SuppressWarnings("rawtypes") -// @Test - public void testLBStringMessageEvent() throws Exception { - System.setProperty("MAIN_CLASS", GenericConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoStringMessage"); - FunctionInvoker invoker = new FunctionInvoker(); + String apiGatewayEvent = "{\n" + + " \"resource\": \"/uppercase2\",\n" + + " \"path\": \"/uppercase2\",\n" + + " \"httpMethod\": \"POST\",\n" + + " \"headers\": {\n" + + " \"accept\": \"*/*\",\n" + + " \"content-type\": \"application/json\",\n" + + " \"Host\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" + + " \"User-Agent\": \"curl/7.54.0\",\n" + + " \"X-Amzn-Trace-Id\": \"Root=1-5ece339e-e0595766066d703ec70f1522\",\n" + + " \"X-Forwarded-For\": \"90.37.8.133\",\n" + + " \"X-Forwarded-Port\": \"443\",\n" + + " \"X-Forwarded-Proto\": \"https\"\n" + + " },\n" + + " \"multiValueHeaders\": {\n" + + " \"accept\": [\n" + + " \"*/*\"\n" + + " ],\n" + + " \"content-type\": [\n" + + " \"application/json\"\n" + + " ],\n" + + " \"Host\": [\n" + + " \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\"\n" + + " ],\n" + + " \"User-Agent\": [\n" + + " \"curl/7.54.0\"\n" + + " ],\n" + + " \"X-Amzn-Trace-Id\": [\n" + + " \"Root=1-5ece339e-e0595766066d703ec70f1522\"\n" + + " ],\n" + + " \"X-Forwarded-For\": [\n" + + " \"90.37.8.133\"\n" + + " ],\n" + + " \"X-Forwarded-Port\": [\n" + + " \"443\"\n" + + " ],\n" + + " \"X-Forwarded-Proto\": [\n" + + " \"https\"\n" + + " ]\n" + + " },\n" + + " \"queryStringParameters\": null,\n" + + " \"multiValueQueryStringParameters\": null,\n" + + " \"pathParameters\": null,\n" + + " \"stageVariables\": null,\n" + + " \"requestContext\": {\n" + + " \"resourceId\": \"qf0io6\",\n" + + " \"resourcePath\": \"/uppercase2\",\n" + + " \"httpMethod\": \"POST\",\n" + + " \"extendedRequestId\": \"NL0A1EokCGYFZOA=\",\n" + + " \"requestTime\": \"27/May/2020:09:32:14 +0000\",\n" + + " \"path\": \"/test/uppercase2\",\n" + + " \"accountId\": \"313369169943\",\n" + + " \"protocol\": \"HTTP/1.1\",\n" + + " \"stage\": \"test\",\n" + + " \"domainPrefix\": \"fhul32ccy2\",\n" + + " \"requestTimeEpoch\": 1590571934872,\n" + + " \"requestId\": \"b96500aa-f92a-43c3-9360-868ba4053a00\",\n" + + " \"identity\": {\n" + + " \"cognitoIdentityPoolId\": null,\n" + + " \"accountId\": null,\n" + + " \"cognitoIdentityId\": null,\n" + + " \"caller\": null,\n" + + " \"sourceIp\": \"90.37.8.133\",\n" + + " \"principalOrgId\": null,\n" + + " \"accessKey\": null,\n" + + " \"cognitoAuthenticationType\": null,\n" + + " \"cognitoAuthenticationProvider\": null,\n" + + " \"userArn\": null,\n" + + " \"userAgent\": \"curl/7.54.0\",\n" + + " \"user\": null\n" + + " },\n" + + " \"domainName\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" + + " \"apiId\": \"fhul32ccy2\"\n" + + " },\n" + + " \"body\":\"hello\",\n" + + " \"isBase64Encoded\": false\n" + + "}"; - InputStream targetStream = new ByteArrayInputStream(this.sampleLBEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); + String apiGatewayEventWithStructuredBody = "{\n" + + " \"resource\": \"/uppercase2\",\n" + + " \"path\": \"/uppercase2\",\n" + + " \"httpMethod\": \"POST\",\n" + + " \"headers\": {\n" + + " \"accept\": \"*/*\",\n" + + " \"content-type\": \"application/json\",\n" + + " \"Host\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" + + " \"User-Agent\": \"curl/7.54.0\",\n" + + " \"X-Amzn-Trace-Id\": \"Root=1-5ece339e-e0595766066d703ec70f1522\",\n" + + " \"X-Forwarded-For\": \"90.37.8.133\",\n" + + " \"X-Forwarded-Port\": \"443\",\n" + + " \"X-Forwarded-Proto\": \"https\"\n" + + " },\n" + + " \"multiValueHeaders\": {\n" + + " \"accept\": [\n" + + " \"*/*\"\n" + + " ],\n" + + " \"content-type\": [\n" + + " \"application/json\"\n" + + " ],\n" + + " \"Host\": [\n" + + " \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\"\n" + + " ],\n" + + " \"User-Agent\": [\n" + + " \"curl/7.54.0\"\n" + + " ],\n" + + " \"X-Amzn-Trace-Id\": [\n" + + " \"Root=1-5ece339e-e0595766066d703ec70f1522\"\n" + + " ],\n" + + " \"X-Forwarded-For\": [\n" + + " \"90.37.8.133\"\n" + + " ],\n" + + " \"X-Forwarded-Port\": [\n" + + " \"443\"\n" + + " ],\n" + + " \"X-Forwarded-Proto\": [\n" + + " \"https\"\n" + + " ]\n" + + " },\n" + + " \"queryStringParameters\": null,\n" + + " \"multiValueQueryStringParameters\": null,\n" + + " \"pathParameters\": null,\n" + + " \"stageVariables\": null,\n" + + " \"requestContext\": {\n" + + " \"resourceId\": \"qf0io6\",\n" + + " \"resourcePath\": \"/uppercase2\",\n" + + " \"httpMethod\": \"POST\",\n" + + " \"extendedRequestId\": \"NL0A1EokCGYFZOA=\",\n" + + " \"requestTime\": \"27/May/2020:09:32:14 +0000\",\n" + + " \"path\": \"/test/uppercase2\",\n" + + " \"accountId\": \"313369169943\",\n" + + " \"protocol\": \"HTTP/1.1\",\n" + + " \"stage\": \"test\",\n" + + " \"domainPrefix\": \"fhul32ccy2\",\n" + + " \"requestTimeEpoch\": 1590571934872,\n" + + " \"requestId\": \"b96500aa-f92a-43c3-9360-868ba4053a00\",\n" + + " \"identity\": {\n" + + " \"cognitoIdentityPoolId\": null,\n" + + " \"accountId\": null,\n" + + " \"cognitoIdentityId\": null,\n" + + " \"caller\": null,\n" + + " \"sourceIp\": \"90.37.8.133\",\n" + + " \"principalOrgId\": null,\n" + + " \"accessKey\": null,\n" + + " \"cognitoAuthenticationType\": null,\n" + + " \"cognitoAuthenticationProvider\": null,\n" + + " \"userArn\": null,\n" + + " \"userAgent\": \"curl/7.54.0\",\n" + + " \"user\": null\n" + + " },\n" + + " \"domainName\": \"fhul32ccy2.execute-api.eu-west-3.amazonaws.com\",\n" + + " \"apiId\": \"fhul32ccy2\"\n" + + " },\n" + + " \"body\":{\"name\":\"Jim Lahey\"},\n" + + " \"isBase64Encoded\": false\n" + + "}"; - String result = new String(output.toByteArray(), StandardCharsets.UTF_8); - - ObjectMapper mapper = new ObjectMapper(); - Map responseMap = mapper.readValue(result, Map.class); - assertThat(responseMap.get("statusCode")).isEqualTo(200); - assertThat(responseMap.get("statusDescription")).isEqualTo("200 OK"); - } - -// @Test - public void testKinesisStringMessageEvent() throws Exception { - System.setProperty("MAIN_CLASS", GenericConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoStringMessage"); - FunctionInvoker invoker = new FunctionInvoker(); - - InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - - String result = new String(output.toByteArray(), StandardCharsets.UTF_8); - assertThat(result).isEqualTo(this.sampleKinesisEvent); - } - -// @Test + @Test public void testKinesisStringEvent() throws Exception { - System.setProperty("MAIN_CLASS", GenericConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoString"); - FunctionInvoker invoker = new FunctionInvoker(); + Assertions.assertThrows(IllegalArgumentException.class, () -> { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoString"); + FunctionInvoker invoker = new FunctionInvoker(); - InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); - ByteArrayOutputStream output = new ByteArrayOutputStream(); - invoker.handleRequest(targetStream, output, null); - - String result = new String(output.toByteArray(), StandardCharsets.UTF_8); - System.out.println(result); - assertThat(result).isEqualTo(this.sampleKinesisEvent); + InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + }); } - @Test public void testKinesisEvent() throws Exception { System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); - System.setProperty("spring.cloud.function.definition", "echoKinesisEvent"); + System.setProperty("spring.cloud.function.definition", "inputKinesisEvent"); FunctionInvoker invoker = new FunctionInvoker(); InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); @@ -162,42 +286,192 @@ public class FunctionInvokerTests { invoker.handleRequest(targetStream, output, null); String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("49590338271490256608559692538361571095921575989136588898"); + } + + @Test + public void testKinesisEventAsMessage() throws Exception { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputKinesisEventAsMessage"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("49590338271490256608559692538361571095921575989136588898"); + } + + @Test + public void testKinesisEventAsMap() throws Exception { + System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputKinesisEventAsMap"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("49590338271490256608559692538361571095921575989136588898"); + } + + @SuppressWarnings("rawtypes") + @Test + public void testApiGatewayStringEventBody() throws Exception { + System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "uppercase"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + ObjectMapper mapper = new ObjectMapper(); + Map result = mapper.readValue(output.toByteArray(), Map.class); + assertThat(result.get("body")).isEqualTo("HELLO"); + } + + @SuppressWarnings("rawtypes") + @Test + public void testApiGatewayMapEventBody() throws Exception { + System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "uppercasePojo"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEventWithStructuredBody.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + Map result = mapper.readValue(output.toByteArray(), Map.class); + assertThat(result.get("body")).isEqualTo("JIM LAHEY"); + } + + @SuppressWarnings("rawtypes") + @Test + public void testApiGatewayEvent() throws Exception { + System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputApiEvent"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + Map result = mapper.readValue(output.toByteArray(), Map.class); System.out.println(result); - assertThat(result).contains("\"sequenceNumber\":\"49590338271490256608559692538361571095921575989136588898\""); + assertThat(result.get("body")).isEqualTo("hello"); } - @EnableAutoConfiguration - @Configuration - public static class GenericConfiguration { + @SuppressWarnings("rawtypes") + @Test + public void testApiGatewayEventAsMessage() throws Exception { + System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputApiEventAsMessage"); + FunctionInvoker invoker = new FunctionInvoker(); - @Bean - public Function, Message> echoStringMessage() { - return v -> { - System.out.println("Received: " + v); - return v; - }; - } + InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); - @Bean - public Function echoString() { - return v -> { - System.out.println("Received: " + v); - return v; - }; - } + Map result = mapper.readValue(output.toByteArray(), Map.class); + System.out.println(result); + assertThat(result.get("body")).isEqualTo("hello"); } + @Test + public void testApiGatewayEventAsMap() throws Exception { + System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "inputApiEventAsMap"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + Map result = mapper.readValue(output.toByteArray(), Map.class); + System.out.println(result); + assertThat(result.get("body")).isEqualTo("hello"); + } @EnableAutoConfiguration @Configuration public static class KinesisConfiguration { + @Bean + public Function echoString() { + return v -> v; + } @Bean - public Function echoKinesisEvent() { + public Function inputKinesisEvent() { return v -> { System.out.println("Received: " + v); - return v; + return v.toString(); + }; + } + + @Bean + public Function, String> inputKinesisEventAsMessage() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); + }; + } + + @Bean + public Function, String> inputKinesisEventAsMap() { + return v -> { + System.out.println("Received: " + v); + return v.toString(); }; } } + + @EnableAutoConfiguration + @Configuration + public static class ApiGatewayConfiguration { + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function uppercasePojo() { + return v -> v.getName().toUpperCase(); + } + + @Bean + public Function inputApiEvent() { + return v -> { + return v.getBody(); + }; + } + + @Bean + public Function, String> inputApiEventAsMessage() { + return v -> { + return v.getPayload().getBody(); + }; + } + + @Bean + public Function, String> inputApiEventAsMap() { + return v -> { + String body = (String) v.get("body"); + return body; + }; + } + } + + public static class Person { + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } }