diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java index fdb099255..3f3da31e7 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java @@ -56,7 +56,7 @@ public final class AWSLambdaUtils { } static boolean isSupportedAWSType(Type inputType) { - if (FunctionTypeUtils.isMessage(inputType)) { + if (FunctionTypeUtils.isMessage(inputType) || FunctionTypeUtils.isPublisher(inputType)) { inputType = FunctionTypeUtils.getImmediateGenericType(inputType, 0); } String typeName = inputType.getTypeName(); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index a2273be2a..f188df8d2 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -690,12 +690,6 @@ public class FunctionInvokerTests { @Test public void testS3Event() throws Exception { - -// S3EventSerializer ser = new S3EventSerializer().withClass(S3Event.class).withClassLoader(S3Event.class.getClassLoader()); -// InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); -// S3Event event = ser.fromJson(targetStream); -// System.out.println(event); - System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); System.setProperty("spring.cloud.function.definition", "inputS3Event"); FunctionInvoker invoker = new FunctionInvoker(); @@ -751,6 +745,20 @@ public class FunctionInvokerTests { assertThat(result.get("body")).isEqualTo("\"Hello from ELB\""); } + @Test + public void testS3EventReactive() throws Exception { + System.setProperty("MAIN_CLASS", S3Configuration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoStringFlux"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.s3Event.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).contains("s3SchemaVersion"); + } + @Test public void testLBEvent() throws Exception { System.setProperty("MAIN_CLASS", LBConfiguration.class.getName()); @@ -1276,6 +1284,11 @@ public class FunctionInvokerTests { return v -> v; } + @Bean + public Function, Flux> echoStringFlux() { + return v -> v; + } + @Bean public Function inputS3Event(JsonMapper jsonMapper) { return v -> {