GH-530, GH-630 Improvements to AWS Custom Runtime
This commit provides initial set of improvements to executing functions in AWS Custom Runtime - Consistent invocation model for functional as well as @Bean configuration models via new CustomRuntimeEventLoop as well as AWSLambdaUtils - Clean up classpath to decrease the size of the JAR/ZIP file - Configuration simplification which no longer requires enabling of function exporter It also allows user to define functions that rely on AWS types such as APIGatewayProxyRequestEvent The existing invocation model remains in tact for the time being. Both invocation models are mutually exclusing in theit setup to avoid potential conflict. Resolves #538 Resolves #630
This commit is contained in:
@@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.adapter.aws;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.Context;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.cloud.function.context.catalog.FunctionTypeUtils;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
*
|
||||
*/
|
||||
final class AWSLambdaUtils {
|
||||
|
||||
private static Log logger = LogFactory.getLog(AWSLambdaUtils.class);
|
||||
|
||||
private AWSLambdaUtils() {
|
||||
|
||||
}
|
||||
|
||||
public static Message<byte[]> generateMessage(byte[] payload, MessageHeaders headers,
|
||||
FunctionInvocationWrapper function, JsonMapper mapper) {
|
||||
return generateMessage(payload, headers, function, mapper, null);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public static Message<byte[]> generateMessage(byte[] payload, MessageHeaders headers,
|
||||
FunctionInvocationWrapper function, JsonMapper mapper, @Nullable Context awsContext) {
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Incoming JSON for ApiGateway Event: " + new String(payload));
|
||||
}
|
||||
|
||||
MessageBuilder messageBuilder = null;
|
||||
Object request = mapper.fromJson(payload, Object.class);
|
||||
Type inputType = function.getInputType();
|
||||
if (FunctionTypeUtils.isMessage(inputType)) {
|
||||
inputType = FunctionTypeUtils.getImmediateGenericType(inputType, 0);
|
||||
}
|
||||
boolean mapInputType = (inputType instanceof ParameterizedType && ((Class<?>) ((ParameterizedType) inputType).getRawType()).isAssignableFrom(Map.class));
|
||||
if (request instanceof Map) {
|
||||
Map<String, ?> requestMap = (Map<String, ?>) request;
|
||||
if (requestMap.containsKey("Records")) {
|
||||
List<Map<String, ?>> records = (List<Map<String, ?>>) requestMap.get("Records");
|
||||
Assert.notEmpty(records, "Incoming event has no records: " + requestMap);
|
||||
logEvent(records);
|
||||
messageBuilder = MessageBuilder.withPayload(payload);
|
||||
}
|
||||
else if (requestMap.containsKey("httpMethod")) { // API Gateway
|
||||
logger.info("Incoming request is API Gateway");
|
||||
if (inputType.getTypeName().endsWith(APIGatewayProxyRequestEvent.class.getSimpleName())) {
|
||||
APIGatewayProxyRequestEvent gatewayEvent = mapper.fromJson(requestMap, APIGatewayProxyRequestEvent.class);
|
||||
messageBuilder = MessageBuilder.withPayload(gatewayEvent);
|
||||
}
|
||||
else if (mapInputType) {
|
||||
messageBuilder = MessageBuilder.withPayload(requestMap).setHeader("httpMethod", requestMap.get("httpMethod"));
|
||||
}
|
||||
else {
|
||||
Object body = requestMap.remove("body");
|
||||
body = body instanceof String ? String.valueOf(body).getBytes(StandardCharsets.UTF_8) : mapper.toJson(body);
|
||||
messageBuilder = MessageBuilder.withPayload(body).copyHeaders(requestMap);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (messageBuilder == null) {
|
||||
messageBuilder = MessageBuilder.withPayload(payload);
|
||||
}
|
||||
if (awsContext != null) {
|
||||
messageBuilder.setHeader("aws-context", awsContext);
|
||||
}
|
||||
return messageBuilder.copyHeaders(headers).build();
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public static byte[] generateOutput(Message requestMessage, Message<byte[]> responseMessage,
|
||||
JsonMapper mapper) {
|
||||
byte[] responseBytes = responseMessage.getPayload();
|
||||
if (requestMessage.getHeaders().containsKey("httpMethod") || requestMessage.getPayload() instanceof APIGatewayProxyRequestEvent) { // API Gateway
|
||||
Map<String, Object> response = new HashMap<String, Object>();
|
||||
response.put("isBase64Encoded", false);
|
||||
|
||||
MessageHeaders headers = responseMessage.getHeaders();
|
||||
int statusCode = headers.containsKey("statusCode")
|
||||
? (int) headers.get("statusCode")
|
||||
: 200;
|
||||
|
||||
response.put("statusCode", statusCode);
|
||||
if (isRequestKinesis(requestMessage)) {
|
||||
HttpStatus httpStatus = HttpStatus.valueOf(statusCode);
|
||||
response.put("statusDescription", httpStatus.toString());
|
||||
}
|
||||
|
||||
String body = new String(responseMessage.getPayload(), StandardCharsets.UTF_8).replaceAll("\\\"", "\"");
|
||||
response.put("body", body);
|
||||
|
||||
Map<String, String> responseHeaders = new HashMap<>();
|
||||
headers.keySet().forEach(key -> responseHeaders.put(key, headers.get(key).toString()));
|
||||
|
||||
response.put("headers", responseHeaders);
|
||||
responseBytes = mapper.toJson(response);
|
||||
}
|
||||
|
||||
return responseBytes;
|
||||
}
|
||||
|
||||
private static void logEvent(List<Map<String, ?>> records) {
|
||||
if (isKinesisEvent(records.get(0))) {
|
||||
logger.info("Incoming request is Kinesis Event");
|
||||
}
|
||||
else if (isS3Event(records.get(0))) {
|
||||
logger.info("Incoming request is S3 Event");
|
||||
}
|
||||
else if (isSNSEvent(records.get(0))) {
|
||||
logger.info("Incoming request is SNS Event");
|
||||
}
|
||||
else {
|
||||
logger.info("Incoming request is SQS Event");
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isRequestKinesis(Message<Object> requestMessage) {
|
||||
return requestMessage.getHeaders().containsKey("Records");
|
||||
}
|
||||
|
||||
private static boolean isSNSEvent(Map<String, ?> record) {
|
||||
return record.containsKey("Sns");
|
||||
}
|
||||
|
||||
private static boolean isS3Event(Map<String, ?> record) {
|
||||
return record.containsKey("s3");
|
||||
}
|
||||
|
||||
private static boolean isKinesisEvent(Map<String, ?> record) {
|
||||
return record.containsKey("kinesis");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.adapter.aws;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.text.MessageFormat;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.cloud.function.context.FunctionCatalog;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.RequestEntity;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.web.client.RestTemplate;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author Oleg Zhurakousky
|
||||
* @since 3.1.1
|
||||
*
|
||||
*/
|
||||
@Configuration
|
||||
@ConditionalOnProperty("AWS_LAMBDA_RUNTIME_API")
|
||||
public class CustomRuntimeEventLoop {
|
||||
|
||||
private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class);
|
||||
|
||||
private static final String LAMBDA_VERSION_DATE = "2018-06-01";
|
||||
private static final String LAMBDA_RUNTIME_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/next";
|
||||
private static final String LAMBDA_INVOCATION_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/response";
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("AWS_LAMBDA_RUNTIME_API")
|
||||
public CommandLineRunner backgrounder(ApplicationContext applicationContext) {
|
||||
return args -> eventLoop(applicationContext);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
static void eventLoop(ApplicationContext context) {
|
||||
logger.info("Starting spring-cloud-function CustomRuntimeEventLoop");
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("AWS LAMBDA ENVIRONMENT: " + System.getenv());
|
||||
}
|
||||
|
||||
String runtimeApi = System.getenv("AWS_LAMBDA_RUNTIME_API");
|
||||
String eventUri = MessageFormat.format(LAMBDA_RUNTIME_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Event URI: " + eventUri);
|
||||
}
|
||||
|
||||
RequestEntity<Void> requestEntity = RequestEntity.get(URI.create(eventUri)).build();
|
||||
FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class);
|
||||
RestTemplate rest = new RestTemplate();
|
||||
JsonMapper mapper = context.getBean(JsonMapper.class);
|
||||
|
||||
logger.info("Entering event loop");
|
||||
while (true) {
|
||||
logger.debug("Attempting to get new event");
|
||||
ResponseEntity<String> response = rest.exchange(requestEntity, String.class);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("New Event received: " + response.getBody());
|
||||
}
|
||||
|
||||
FunctionInvocationWrapper function = locateFunction(functionCatalog, response.getHeaders().getContentType());
|
||||
|
||||
Message<byte[]> eventMessage = AWSLambdaUtils.generateMessage(response.getBody().getBytes(StandardCharsets.UTF_8),
|
||||
fromHttp(response.getHeaders()), function, mapper);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Event message: " + eventMessage);
|
||||
}
|
||||
|
||||
String requestId = response.getHeaders().getFirst("Lambda-Runtime-Aws-Request-Id");
|
||||
String invocationUrl = MessageFormat
|
||||
.format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
|
||||
|
||||
Message<byte[]> responseMessage = (Message<byte[]>) function.apply(eventMessage);
|
||||
|
||||
String reply = new String(responseMessage.getPayload(), StandardCharsets.UTF_8);
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Reply from function: " + reply);
|
||||
}
|
||||
|
||||
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper);
|
||||
ResponseEntity<Object> result = rest
|
||||
.exchange(RequestEntity.post(URI.create(invocationUrl)).body(outputBody), Object.class);
|
||||
|
||||
if (logger.isInfoEnabled()) {
|
||||
logger.info("Result POST status: " + result.getStatusCode());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static FunctionInvocationWrapper locateFunction(FunctionCatalog functionCatalog, MediaType contentType) {
|
||||
String handlerName = System.getenv("_HANDLER");
|
||||
FunctionInvocationWrapper function = functionCatalog.lookup(handlerName, contentType.toString());
|
||||
if (function == null) {
|
||||
handlerName = System.getenv("spring.cloud.function.definition");
|
||||
}
|
||||
function = functionCatalog.lookup(handlerName, contentType.toString());
|
||||
Assert.notNull(function, "Failed to locate function. Tried locating default function, "
|
||||
+ "function by '_HANDLER' env variable as well as'spring.cloud.function.definition'.");
|
||||
if (function != null && logger.isInfoEnabled()) {
|
||||
logger.info("Located function " + function.getFunctionDefinition());
|
||||
}
|
||||
return function;
|
||||
}
|
||||
|
||||
private static MessageHeaders fromHttp(HttpHeaders headers) {
|
||||
Map<String, Object> map = new LinkedHashMap<>();
|
||||
for (String name : headers.keySet()) {
|
||||
Collection<?> values = multi(headers.get(name));
|
||||
name = name.toLowerCase();
|
||||
Object value = values == null ? null
|
||||
: (values.size() == 1 ? values.iterator().next() : values);
|
||||
if (name.toLowerCase().equals(HttpHeaders.CONTENT_TYPE.toLowerCase())) {
|
||||
name = MessageHeaders.CONTENT_TYPE;
|
||||
}
|
||||
map.put(name, value);
|
||||
}
|
||||
return new MessageHeaders(map);
|
||||
}
|
||||
|
||||
private static Collection<?> multi(Object value) {
|
||||
return value instanceof Collection ? (Collection<?>) value : Arrays.asList(value);
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,7 @@ import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
* @author Oleg Zhurakousky
|
||||
*/
|
||||
@Order(0)
|
||||
public class CustomRuntimeInitializer implements ApplicationContextInitializer<GenericApplicationContext> {
|
||||
@@ -35,9 +36,14 @@ public class CustomRuntimeInitializer implements ApplicationContextInitializer<G
|
||||
Boolean enabled = context.getEnvironment().getProperty("spring.cloud.function.web.export.enabled",
|
||||
Boolean.class);
|
||||
if (enabled == null || !enabled) {
|
||||
return;
|
||||
if (StringUtils.hasText(System.getenv("AWS_LAMBDA_RUNTIME_API"))) {
|
||||
if (context.getBeanFactory().getBeanNamesForType(CustomRuntimeEventLoop.class, false, false).length == 0) {
|
||||
context.registerBean(StringUtils.uncapitalize(CustomRuntimeEventLoop.class.getSimpleName()),
|
||||
CommandLineRunner.class, () -> args -> CustomRuntimeEventLoop.eventLoop(context));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (ContextFunctionCatalogInitializer.enabled
|
||||
else if (ContextFunctionCatalogInitializer.enabled
|
||||
&& context.getEnvironment().getProperty("spring.functional.enabled", Boolean.class, false)) {
|
||||
if (context.getBeanFactory().getBeanNamesForType(DestinationResolver.class, false, false).length == 0) {
|
||||
context.registerBean(LambdaDestinationResolver.class, () -> new LambdaDestinationResolver());
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
org.springframework.cloud.function.adapter.aws.CustomRuntimeAutoConfiguration
|
||||
org.springframework.cloud.function.adapter.aws.CustomRuntimeEventLoop
|
||||
org.springframework.context.ApplicationContextInitializer=\
|
||||
org.springframework.cloud.function.adapter.aws.CustomRuntimeInitializer
|
||||
org.springframework.boot.env.EnvironmentPostProcessor=\
|
||||
|
||||
Reference in New Issue
Block a user