GH-841 Add support for propagating errors from AWS Custom Runtime

Resolves #841
This commit is contained in:
Oleg Zhurakousky
2022-04-20 19:56:36 +02:00
parent bbac4b198e
commit 2dfd1f7eae
2 changed files with 48 additions and 11 deletions

View File

@@ -16,12 +16,15 @@
package org.springframework.cloud.function.adapter.aws;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.SocketException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -30,6 +33,7 @@ import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.cloud.function.json.JsonMapper;
@@ -45,6 +49,8 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.web.client.RestTemplate;
import static org.apache.http.HttpHeaders.USER_AGENT;
/**
@@ -62,6 +68,7 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle {
private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class);
static final String LAMBDA_VERSION_DATE = "2018-06-01";
private static final String LAMBDA_ERROR_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/error";
private static final String LAMBDA_RUNTIME_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/next";
private static final String LAMBDA_INVOCATION_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/response";
private static final String USER_AGENT_VALUE = String.format(
@@ -142,19 +149,46 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle {
String invocationUrl = MessageFormat
.format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
Message<byte[]> responseMessage = (Message<byte[]>) function.apply(eventMessage);
try {
Message<byte[]> responseMessage = (Message<byte[]>) function.apply(eventMessage);
if (responseMessage != null && logger.isDebugEnabled()) {
logger.debug("Reply from function: " + responseMessage);
if (responseMessage != null && logger.isDebugEnabled()) {
logger.debug("Reply from function: " + responseMessage);
}
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType());
ResponseEntity<Object> result = rest.exchange(RequestEntity.post(URI.create(invocationUrl))
.header(USER_AGENT, USER_AGENT_VALUE)
.body(outputBody), Object.class);
if (logger.isInfoEnabled()) {
logger.info("Result POST status: " + result.getStatusCode());
}
}
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType());
ResponseEntity<Object> result = rest.exchange(RequestEntity.post(URI.create(invocationUrl))
.header(USER_AGENT, USER_AGENT_VALUE)
.body(outputBody), Object.class);
if (logger.isInfoEnabled()) {
logger.info("Result POST status: " + result.getStatusCode());
catch (Exception e) {
String errorMessage = e.getMessage();
String errorType = e.getClass().getSimpleName();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
e.printStackTrace(pw);
String stackTrace = sw.toString();
Map<String, String> em = new HashMap<>();
em.put("errorMessage", errorMessage);
em.put("errorType", errorType);
em.put("stackTrace", stackTrace);
byte[] outputBody = mapper.toJson(em);
try {
String errorUrl = MessageFormat.format(LAMBDA_ERROR_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
ResponseEntity<Object> result = rest.exchange(RequestEntity.post(URI.create(errorUrl))
.header(USER_AGENT, USER_AGENT_VALUE)
.body(outputBody), Object.class);
if (logger.isInfoEnabled()) {
logger.info("Result ERROR status: " + result.getStatusCode());
}
}
catch (Exception e2) {
throw new IllegalArgumentException("Failed to report error", e2);
}
}
}
}