diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java index 42d6e2fce..43cec472d 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java @@ -16,12 +16,15 @@ package org.springframework.cloud.function.adapter.aws; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.SocketException; import java.net.URI; import java.nio.charset.StandardCharsets; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -30,6 +33,7 @@ import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.cloud.function.json.JsonMapper; @@ -45,6 +49,8 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.web.client.RestTemplate; + + import static org.apache.http.HttpHeaders.USER_AGENT; /** @@ -61,6 +67,7 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle { private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class); static final String LAMBDA_VERSION_DATE = "2018-06-01"; + private static final String LAMBDA_ERROR_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/error"; private static final String LAMBDA_RUNTIME_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/next"; private static final String LAMBDA_INVOCATION_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/response"; private static final String USER_AGENT_VALUE = String.format( @@ -141,19 +148,46 @@ public final class CustomRuntimeEventLoop implements SmartLifecycle { String invocationUrl = MessageFormat .format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId); - Message responseMessage = (Message) function.apply(eventMessage); + try { + Message responseMessage = (Message) function.apply(eventMessage); - if (responseMessage != null && logger.isDebugEnabled()) { - logger.debug("Reply from function: " + responseMessage); + if (responseMessage != null && logger.isDebugEnabled()) { + logger.debug("Reply from function: " + responseMessage); + } + + byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType()); + ResponseEntity result = rest.exchange(RequestEntity.post(URI.create(invocationUrl)) + .header(USER_AGENT, USER_AGENT_VALUE) + .body(outputBody), Object.class); + + if (logger.isInfoEnabled()) { + logger.info("Result POST status: " + result.getStatusCode()); + } } - - byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType()); - ResponseEntity result = rest.exchange(RequestEntity.post(URI.create(invocationUrl)) - .header(USER_AGENT, USER_AGENT_VALUE) - .body(outputBody), Object.class); - - if (logger.isInfoEnabled()) { - logger.info("Result POST status: " + result.getStatusCode()); + catch (Exception e) { + String errorMessage = e.getMessage(); + String errorType = e.getClass().getSimpleName(); + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + e.printStackTrace(pw); + String stackTrace = sw.toString(); + Map em = new HashMap<>(); + em.put("errorMessage", errorMessage); + em.put("errorType", errorType); + em.put("stackTrace", stackTrace); + byte[] outputBody = mapper.toJson(em); + try { + String errorUrl = MessageFormat.format(LAMBDA_ERROR_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId); + ResponseEntity result = rest.exchange(RequestEntity.post(URI.create(errorUrl)) + .header(USER_AGENT, USER_AGENT_VALUE) + .body(outputBody), Object.class); + if (logger.isInfoEnabled()) { + logger.info("Result ERROR status: " + result.getStatusCode()); + } + } + catch (Exception e2) { + throw new IllegalArgumentException("Failed to report error", e2); + } } } } diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java b/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java index 730053793..cea0dfab7 100644 --- a/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java @@ -21,6 +21,9 @@ public class LambdaApplication public Function uppercase() { return value -> { logger.info("Processing: " + value); + if (value.equals("error")) { + throw new IllegalArgumentException("Intentional"); + } return value.toUpperCase(); }; } diff --git a/spring-cloud-function-samples/function-sample/src/test/java/com/example/WebTestClientTests.java b/spring-cloud-function-samples/function-sample/src/test/java/com/example/WebTestClientTests.java index c74216e7b..9e4cfc703 100644 --- a/spring-cloud-function-samples/function-sample/src/test/java/com/example/WebTestClientTests.java +++ b/spring-cloud-function-samples/function-sample/src/test/java/com/example/WebTestClientTests.java @@ -19,12 +19,6 @@ public class WebTestClientTests { @Autowired private WebTestClient client; - @Test - public void words() { - client.get().uri("/words").exchange() - .expectStatus().isOk().expectBody(String.class).isEqualTo("[\"foo\",\"bar\"]"); - } - @Test public void uppercase() { client.post().uri("/uppercase").body(Mono.just("foo"), String.class).exchange()