diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java index 5ed017389..4fb5bbf30 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/AWSLambdaUtils.java @@ -122,6 +122,9 @@ final class AWSLambdaUtils { } } } + else if (request instanceof Iterable) { + messageBuilder = MessageBuilder.withPayload(request); + } if (messageBuilder == null) { messageBuilder = MessageBuilder.withPayload(payload); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java index 554265398..974593526 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java @@ -28,13 +28,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.springframework.boot.CommandLineRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.RequestEntity; @@ -52,9 +48,7 @@ import org.springframework.web.client.RestTemplate; * @since 3.1.1 * */ -@Configuration -@ConditionalOnProperty("AWS_LAMBDA_RUNTIME_API") -public class CustomRuntimeEventLoop { +final class CustomRuntimeEventLoop { private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class); @@ -62,10 +56,7 @@ public class CustomRuntimeEventLoop { private static final String LAMBDA_RUNTIME_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/next"; private static final String LAMBDA_INVOCATION_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/response"; - @Bean - @ConditionalOnProperty("AWS_LAMBDA_RUNTIME_API") - public CommandLineRunner backgrounder(ApplicationContext applicationContext) { - return args -> eventLoop(applicationContext); + private CustomRuntimeEventLoop() { } @SuppressWarnings("unchecked") @@ -124,12 +115,16 @@ public class CustomRuntimeEventLoop { private static FunctionInvocationWrapper locateFunction(FunctionCatalog functionCatalog, MediaType contentType) { String handlerName = System.getenv("DEFAULT_HANDLER"); FunctionInvocationWrapper function = functionCatalog.lookup(handlerName, contentType.toString()); + if (function == null) { + handlerName = System.getenv("_HANDLER"); + } + function = functionCatalog.lookup(handlerName, contentType.toString()); if (function == null) { handlerName = System.getenv("spring.cloud.function.definition"); } function = functionCatalog.lookup(handlerName, contentType.toString()); Assert.notNull(function, "Failed to locate function. Tried locating default function, " - + "function by '_HANDLER' env variable as well as'spring.cloud.function.definition'."); + + "function by 'DEFAULT_HANDLER', '_HANDLER' env variable as well as'spring.cloud.function.definition'."); if (function != null && logger.isInfoEnabled()) { logger.info("Located function " + function.getFunctionDefinition()); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java index 7e85a8ac8..10c0c1ecd 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java @@ -47,7 +47,8 @@ public class CustomRuntimeInitializer implements ApplicationContextInitializer args -> CustomRuntimeEventLoop.eventLoop(context)); } } - else if (ContextFunctionCatalogInitializer.enabled + else + if (ContextFunctionCatalogInitializer.enabled && context.getEnvironment().getProperty("spring.functional.enabled", Boolean.class, false)) { if (context.getBeanFactory().getBeanNamesForType(DestinationResolver.class, false, false).length == 0) { context.registerBean(LambdaDestinationResolver.class, () -> new LambdaDestinationResolver()); @@ -60,6 +61,8 @@ public class CustomRuntimeInitializer implements ApplicationContextInitializer clazz = Thread.currentThread().getContextClassLoader().loadClass(handler); if (FunctionInvoker.class.isAssignableFrom(clazz) || AbstractSpringFunctionAdapterInitializer.class.isAssignableFrom(clazz)) { diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index 833c4b75d..6e07d742e 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -19,21 +19,18 @@ package org.springframework.cloud.function.adapter.aws; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Calendar; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Collections; -import java.util.Date; +import java.util.List; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.MapperFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.datatype.joda.JodaModule; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; import org.springframework.boot.SpringApplication; import org.springframework.cloud.function.context.FunctionCatalog; @@ -43,6 +40,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.Environment; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.StreamUtils; import org.springframework.util.StringUtils; @@ -67,33 +65,70 @@ public class FunctionInvoker implements RequestStreamHandler { this.start(); } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings("rawtypes") @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { final byte[] payload = StreamUtils.copyToByteArray(input); + + if (logger.isInfoEnabled()) { + logger.info("Received: " + new String(payload, StandardCharsets.UTF_8)); + } + Message requestMessage = AWSLambdaUtils .generateMessage(payload, new MessageHeaders(Collections.emptyMap()), function.getInputType(), this.objectMapper, context); - Message responseMessage = (Message) this.function.apply(requestMessage); - - byte[] responseBytes = AWSLambdaUtils.generateOutput(requestMessage, responseMessage, this.objectMapper); + Object response = this.function.apply(requestMessage); + byte[] responseBytes = this.buildResult(requestMessage, response); StreamUtils.copy(responseBytes, output); } + @SuppressWarnings("unchecked") + private byte[] buildResult(Message requestMessage, Object output) throws IOException { + Message responseMessage; + if (output instanceof Publisher) { + List result = new ArrayList<>(); + for (Object value : Flux.from((Publisher) output).toIterable()) { + if (logger.isInfoEnabled()) { + logger.info("Response value: " + value); + } + result.add(value); + } + if (result.size() > 1) { + output = result; + } + else { + output = result.get(0); + } + + if (logger.isInfoEnabled()) { + logger.info("OUTPUT: " + output + " - " + output.getClass().getName()); + } + + byte[] payload = this.objectMapper.writeValueAsBytes(output); + responseMessage = MessageBuilder.withPayload(payload).build(); + } + else { + responseMessage = (Message) output; + } + return AWSLambdaUtils.generateOutput(requestMessage, responseMessage, this.objectMapper); + } + private void start() { ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass()); Environment environment = context.getEnvironment(); String functionName = environment.getProperty("spring.cloud.function.definition"); FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class); this.objectMapper = context.getBean(ObjectMapper.class); - //this.configureObjectMapper(); if (logger.isInfoEnabled()) { logger.info("Locating function: '" + functionName + "'"); } this.function = functionCatalog.lookup(functionName, "application/json"); + if (this.function.isOutputTypePublisher()) { + this.function.setSkipOutputConversion(true); + } Assert.notNull(this.function, "Failed to lookup function " + functionName); if (!StringUtils.hasText(functionName)) { @@ -104,20 +139,4 @@ public class FunctionInvoker implements RequestStreamHandler { logger.info("Located function: '" + functionName + "'"); } } - - private void configureObjectMapper() { - SimpleModule module = new SimpleModule(); - module.addDeserializer(Date.class, new JsonDeserializer() { - @Override - public Date deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException { - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(jsonParser.getValueAsLong()); - return calendar.getTime(); - } - }); - this.objectMapper.registerModule(module); - this.objectMapper.registerModule(new JodaModule()); - this.objectMapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true); - } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java index 815d5757e..0953a07e1 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/FunctionInvokerTests.java @@ -32,6 +32,7 @@ import com.amazonaws.services.lambda.runtime.events.SNSEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.context.annotation.Bean; @@ -51,6 +52,8 @@ public class FunctionInvokerTests { ObjectMapper mapper = new ObjectMapper(); + String jsonCollection = "[\"Ricky\",\"Julien\",\"Bubbles\"]"; + String sampleLBEvent = "{" + " \"requestContext\": {" + " \"elb\": {" + @@ -360,6 +363,20 @@ public class FunctionInvokerTests { " \"isBase64Encoded\": false\n" + "}"; + + @Test + public void testCollection() throws Exception { + System.setProperty("MAIN_CLASS", SampleConfiguration.class.getName()); + System.setProperty("spring.cloud.function.definition", "echoStringReactive"); + FunctionInvoker invoker = new FunctionInvoker(); + + InputStream targetStream = new ByteArrayInputStream(this.jsonCollection.getBytes()); + ByteArrayOutputStream output = new ByteArrayOutputStream(); + invoker.handleRequest(targetStream, output, null); + String result = new String(output.toByteArray(), StandardCharsets.UTF_8); + assertThat(result).isEqualTo(this.jsonCollection); + } + @Test public void testKinesisStringEvent() throws Exception { System.setProperty("MAIN_CLASS", KinesisConfiguration.class.getName()); @@ -689,6 +706,20 @@ public class FunctionInvokerTests { assertThat(result.get("body")).isEqualTo("\"OK\""); } + @EnableAutoConfiguration + @Configuration + public static class SampleConfiguration { + @Bean + public Function echoString() { + return v -> v; + } + + @Bean + public Function, Flux> echoStringReactive() { + return v -> v; + } + } + @EnableAutoConfiguration @Configuration public static class KinesisConfiguration { diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index e38c27b67..433f105e1 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -664,11 +664,27 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect /* * */ + @SuppressWarnings("unchecked") private Object fluxifyInputIfNecessary(Object input) { if (!(input instanceof Publisher) && this.isTypePublisher(this.inputType) && !FunctionTypeUtils.isMultipleArgumentType(this.inputType)) { - return input == null - ? FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty() - : FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.just(input); + if (input == null) { + input = FunctionTypeUtils.isMono(this.inputType) ? Mono.empty() : Flux.empty(); + } + else if (input instanceof Message && ((Message) input).getPayload() instanceof Iterable) { + input = FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.just(input).flatMap(v -> { + if (logger.isDebugEnabled()) { + logger.debug("Creating Flux from Iterable: " + ((Message) v).getPayload()); + } + return Flux.fromIterable((Iterable) ((Message) v).getPayload()); + }); + } + else if (input instanceof Iterable) { + input = FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.fromIterable((Iterable) input); + + } + else { + input = FunctionTypeUtils.isMono(this.inputType) ? Mono.just(input) : Flux.just(input); + } } return input; } @@ -1050,6 +1066,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } if (message.getPayload() instanceof Collection) { Type itemType = FunctionTypeUtils.getImmediateGenericType(type, 0); + if (itemType == null) { + itemType = type; + } Type collectionType = CollectionUtils.findCommonElementType((Collection) message.getPayload()); if (collectionType == itemType) { return message.getPayload(); diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java index 488794f87..f7156eb14 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/catalog/BeanFactoryAwareFunctionRegistryTests.java @@ -20,6 +20,7 @@ package org.springframework.cloud.function.context.catalog; import java.io.Serializable; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Date; @@ -593,6 +594,24 @@ public class BeanFactoryAwareFunctionRegistryTests { assertThat(resultList).isEmpty(); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testArrayPayloadOnFluxFunction() throws Exception { + FunctionCatalog catalog = this.configureCatalog(SampleFunctionConfiguration.class); + FunctionInvocationWrapper lmFunction = catalog.lookup("uppercaseFlux", "application/json"); + lmFunction.setSkipOutputConversion(true); + List list = new ArrayList<>(); + list.add("Ricky"); + list.add("Julien"); + list.add("Bubbles"); + Publisher p = (Publisher) lmFunction.apply(MessageBuilder.withPayload(list).setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build()); + List result = new ArrayList<>(); + for (Object value : Flux.from(p).toIterable()) { + result.add(value); + } + assertThat(result.size()).isEqualTo(3); + } + @EnableAutoConfiguration