GH-913 Fix AWS Context propagation

Resolves #913
This commit is contained in:
Oleg Zhurakousky
2022-08-08 15:12:51 +02:00
parent 1a8ea1968e
commit 16eeb77448
4 changed files with 131 additions and 8 deletions

View File

@@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import com.amazonaws.services.lambda.runtime.Context;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -65,8 +66,12 @@ final class AWSLambdaUtils {
|| typeName.equals("com.amazonaws.services.lambda.runtime.events.KinesisEvent");
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Message<byte[]> generateMessage(byte[] payload, Type inputType, boolean isSupplier, JsonMapper jsonMapper) {
return generateMessage(payload, inputType, isSupplier, jsonMapper, null);
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Message<byte[]> generateMessage(byte[] payload, Type inputType, boolean isSupplier, JsonMapper jsonMapper, Context context) {
if (logger.isInfoEnabled()) {
logger.info("Received: " + new String(payload, StandardCharsets.UTF_8));
}
@@ -84,6 +89,9 @@ final class AWSLambdaUtils {
if (!isSupplier && AWSLambdaUtils.isSupportedAWSType(inputType)) {
builder.setHeader(AWSLambdaUtils.AWS_EVENT, true);
}
if (context != null) {
builder.setHeader(AWSLambdaUtils.AWS_CONTEXT, context);
}
//
if (structMessage instanceof Map && ((Map<String, Object>) structMessage).containsKey("headers")) {
builder.copyHeaders((Map<String, Object>) ((Map<String, Object>) structMessage).get("headers"));

View File

@@ -80,7 +80,7 @@ public class FunctionInvoker implements RequestStreamHandler {
@Override
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
Message requestMessage = AWSLambdaUtils
.generateMessage(StreamUtils.copyToByteArray(input), this.function.getInputType(), this.function.isSupplier(), jsonMapper);
.generateMessage(StreamUtils.copyToByteArray(input), this.function.getInputType(), this.function.isSupplier(), jsonMapper, context);
Object response = this.function.apply(requestMessage);
byte[] responseBytes = this.buildResult(requestMessage, response);

View File

@@ -18,7 +18,9 @@ package org.springframework.cloud.function.adapter.aws;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
@@ -26,6 +28,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayCustomAuthorizerEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
@@ -504,11 +507,16 @@ public class FunctionInvokerTests {
public void testKinesisEvent() throws Exception {
System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputKinesisEvent");
FunctionInvoker invoker = new FunctionInvoker();
FunctionInvoker invoker = new FunctionInvoker() {
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
assertThat(context).isNotNull();
super.handleRequest(input, output, context);
}
};
InputStream targetStream = new ByteArrayInputStream(this.sampleKinesisEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
invoker.handleRequest(targetStream, output, new TestContext());
String result = new String(output.toByteArray(), StandardCharsets.UTF_8);
assertThat(result).contains("49590338271490256608559692538361571095921575989136588898");
@@ -760,11 +768,16 @@ public class FunctionInvokerTests {
public void testLBEventInOut() throws Exception {
System.setProperty("MAIN_CLASS", LBConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputOutputLBEvent");
FunctionInvoker invoker = new FunctionInvoker();
FunctionInvoker invoker = new FunctionInvoker() {
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
assertThat(context).isNotNull();
super.handleRequest(input, output, context);
}
};
InputStream targetStream = new ByteArrayInputStream(this.sampleLBEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
invoker.handleRequest(targetStream, output, new TestContext());
Map result = mapper.readValue(output.toByteArray(), Map.class);
assertThat(result.get("body")).isEqualTo("Hello from ELB");
@@ -842,11 +855,16 @@ public class FunctionInvokerTests {
public void testApiGatewayEvent() throws Exception {
System.setProperty("MAIN_CLASS", ApiGatewayConfiguration.class.getName());
System.setProperty("spring.cloud.function.definition", "inputApiEvent");
FunctionInvoker invoker = new FunctionInvoker();
FunctionInvoker invoker = new FunctionInvoker() {
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
assertThat(context).isNotNull();
super.handleRequest(input, output, context);
}
};
InputStream targetStream = new ByteArrayInputStream(this.apiGatewayEvent.getBytes());
ByteArrayOutputStream output = new ByteArrayOutputStream();
invoker.handleRequest(targetStream, output, null);
invoker.handleRequest(targetStream, output, new TestContext());
Map result = mapper.readValue(output.toByteArray(), Map.class);
System.out.println(result);

View File

@@ -0,0 +1,97 @@
/*
* Copyright 2019-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.adapter.aws;
import com.amazonaws.services.lambda.runtime.ClientContext;
import com.amazonaws.services.lambda.runtime.CognitoIdentity;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
/**
*
* @author Oleg Zhurakousky
*
*/
public class TestContext implements Context {
@Override
public String getAwsRequestId() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getLogGroupName() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getLogStreamName() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getFunctionName() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getFunctionVersion() {
// TODO Auto-generated method stub
return null;
}
@Override
public String getInvokedFunctionArn() {
// TODO Auto-generated method stub
return null;
}
@Override
public CognitoIdentity getIdentity() {
// TODO Auto-generated method stub
return null;
}
@Override
public ClientContext getClientContext() {
// TODO Auto-generated method stub
return null;
}
@Override
public int getRemainingTimeInMillis() {
// TODO Auto-generated method stub
return 0;
}
@Override
public int getMemoryLimitInMB() {
// TODO Auto-generated method stub
return 0;
}
@Override
public LambdaLogger getLogger() {
// TODO Auto-generated method stub
return null;
}
}