@@ -29,7 +29,9 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.Context;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2HTTPEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayV2HTTPResponse;
|
||||
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.S3Event;
|
||||
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
|
||||
@@ -163,7 +165,13 @@ final class AWSLambdaUtils {
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
public static byte[] generateOutput(Message requestMessage, Message<byte[]> responseMessage,
|
||||
ObjectMapper objectMapper) {
|
||||
ObjectMapper objectMapper, Type functionOutputType) {
|
||||
|
||||
Class<?> outputClass = FunctionTypeUtils.getRawType(functionOutputType);
|
||||
if (outputClass != null && (APIGatewayV2HTTPResponse.class.isAssignableFrom(outputClass)
|
||||
|| APIGatewayProxyResponseEvent.class.isAssignableFrom(outputClass))) {
|
||||
return responseMessage.getPayload();
|
||||
}
|
||||
|
||||
|
||||
if (!objectMapper.isEnabled(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES)) {
|
||||
@@ -190,7 +198,7 @@ final class AWSLambdaUtils {
|
||||
}
|
||||
|
||||
String body = responseMessage == null
|
||||
? "\"OK\"" : new String(responseMessage.getPayload(), StandardCharsets.UTF_8).replaceAll("\\\"", "\"");
|
||||
? "\"OK\"" : new String(responseMessage.getPayload(), StandardCharsets.UTF_8).replaceAll("\\\"", "");
|
||||
response.put("body", body);
|
||||
|
||||
if (responseMessage != null) {
|
||||
|
||||
@@ -102,7 +102,7 @@ final class CustomRuntimeEventLoop {
|
||||
logger.debug("Reply from function: " + responseMessage);
|
||||
}
|
||||
|
||||
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper);
|
||||
byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType());
|
||||
ResponseEntity<Object> result = rest
|
||||
.exchange(RequestEntity.post(URI.create(invocationUrl)).body(outputBody), Object.class);
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ import java.util.List;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.Context;
|
||||
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@@ -39,6 +40,7 @@ import org.springframework.cloud.function.context.config.RoutingFunction;
|
||||
import org.springframework.cloud.function.utils.FunctionClassUtils;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.core.env.Environment;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
@@ -79,10 +81,22 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
Message requestMessage = AWSLambdaUtils
|
||||
.generateMessage(payload, new MessageHeaders(Collections.emptyMap()), function.getInputType(), this.objectMapper, context);
|
||||
|
||||
Object response = this.function.apply(requestMessage);
|
||||
try {
|
||||
Object response = this.function.apply(requestMessage);
|
||||
byte[] responseBytes = this.buildResult(requestMessage, response);
|
||||
StreamUtils.copy(responseBytes, output);
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error(e);
|
||||
StreamUtils.copy(this.buildExceptionResult(requestMessage, e), output);
|
||||
}
|
||||
}
|
||||
|
||||
byte[] responseBytes = this.buildResult(requestMessage, response);
|
||||
StreamUtils.copy(responseBytes, output);
|
||||
private byte[] buildExceptionResult(Message<?> requestMessage, Exception exception) throws IOException {
|
||||
APIGatewayProxyResponseEvent event = new APIGatewayProxyResponseEvent();
|
||||
event.setStatusCode(HttpStatus.EXPECTATION_FAILED.value());
|
||||
event.setBody(exception.getMessage());
|
||||
return this.objectMapper.writeValueAsBytes(event);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@@ -113,7 +127,7 @@ public class FunctionInvoker implements RequestStreamHandler {
|
||||
else {
|
||||
responseMessage = (Message<byte[]>) output;
|
||||
}
|
||||
return AWSLambdaUtils.generateOutput(requestMessage, responseMessage, this.objectMapper);
|
||||
return AWSLambdaUtils.generateOutput(requestMessage, responseMessage, this.objectMapper, function.getOutputType());
|
||||
}
|
||||
|
||||
private void start() {
|
||||
|
||||
Reference in New Issue
Block a user