diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml index 72626ba0b..050752c41 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml @@ -98,6 +98,33 @@ ${aws-kinesis-aggregator.version} test + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + + org.springframework.boot + spring-boot-starter-web + test + + + + + + + + + + + diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java index 974593526..b235091b4 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java @@ -52,7 +52,7 @@ final class CustomRuntimeEventLoop { private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class); - private static final String LAMBDA_VERSION_DATE = "2018-06-01"; + static final String LAMBDA_VERSION_DATE = "2018-06-01"; private static final String LAMBDA_RUNTIME_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/next"; private static final String LAMBDA_INVOCATION_URL_TEMPLATE = "http://{0}/{1}/runtime/invocation/{2}/response"; @@ -78,11 +78,11 @@ final class CustomRuntimeEventLoop { ObjectMapper mapper = context.getBean(ObjectMapper.class); logger.info("Entering event loop"); - while (true) { + while (isContinue()) { logger.debug("Attempting to get new event"); ResponseEntity response = rest.exchange(requestEntity, String.class); if (logger.isDebugEnabled()) { - logger.debug("New Event received: " + response.getBody()); + logger.debug("New Event received: " + response); } FunctionInvocationWrapper function = locateFunction(functionCatalog, response.getHeaders().getContentType()); @@ -99,7 +99,7 @@ final class CustomRuntimeEventLoop { Message responseMessage = (Message) function.apply(eventMessage); if (responseMessage != null && logger.isDebugEnabled()) { - logger.debug("Reply from function: " + new String(responseMessage.getPayload(), StandardCharsets.UTF_8)); + logger.debug("Reply from function: " + responseMessage); } byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper); @@ -112,19 +112,34 @@ final class CustomRuntimeEventLoop { } } + private static boolean isContinue() { + return Boolean.parseBoolean(System.getProperty("CustomRuntimeEventLoop.continue", "true")); + } + private static FunctionInvocationWrapper locateFunction(FunctionCatalog functionCatalog, MediaType contentType) { String handlerName = System.getenv("DEFAULT_HANDLER"); FunctionInvocationWrapper function = functionCatalog.lookup(handlerName, contentType.toString()); if (function == null) { handlerName = System.getenv("_HANDLER"); + function = functionCatalog.lookup(handlerName, contentType.toString()); } - function = functionCatalog.lookup(handlerName, contentType.toString()); + + if (function == null) { + function = functionCatalog.lookup(null, contentType.toString()); + } + if (function == null) { handlerName = System.getenv("spring.cloud.function.definition"); + function = functionCatalog.lookup(handlerName, contentType.toString()); } - function = functionCatalog.lookup(handlerName, contentType.toString()); + + if (function == null) { + function = functionCatalog.lookup(null, contentType.toString()); + } + Assert.notNull(function, "Failed to locate function. Tried locating default function, " - + "function by 'DEFAULT_HANDLER', '_HANDLER' env variable as well as'spring.cloud.function.definition'."); + + "function by 'DEFAULT_HANDLER', '_HANDLER' env variable as well as'spring.cloud.function.definition'. " + + "Functions available in catalog are: " + functionCatalog.getNames(null)); if (function != null && logger.isInfoEnabled()) { logger.info("Located function " + function.getFunctionDefinition()); } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java index 6e07d742e..97c339ef3 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/FunctionInvoker.java @@ -115,7 +115,8 @@ public class FunctionInvoker implements RequestStreamHandler { } private void start() { - ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass()); + System.out.println(FunctionClassUtils.getStartClass().getName()); + ConfigurableApplicationContext context = SpringApplication.run(FunctionClassUtils.getStartClass(), "--spring.main.web-application-type=none"); Environment environment = context.getEnvironment(); String functionName = environment.getProperty("spring.cloud.function.definition"); FunctionCatalog functionCatalog = context.getBean(FunctionCatalog.class); diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoopTest.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoopTest.java new file mode 100644 index 000000000..809926e9b --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoopTest.java @@ -0,0 +1,288 @@ +/* + * Copyright 2021-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.WebApplicationType; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.util.MimeTypeUtils; + +import static org.assertj.core.api.Assertions.assertThat; +/** + * + * @author Oleg Zhurakousky + */ +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = "spring.main.web-application-type=servlet") +@ContextConfiguration(classes = { + CustomRuntimeEventLoopTest.CustomRuntimeEmulatorConfiguration.class }) +public class CustomRuntimeEventLoopTest { + + @LocalServerPort + private int port; + + @Autowired + private CustomRuntimeEmulatorConfiguration configuration; + + @SuppressWarnings("unchecked") + private Map getEnvironment() throws Exception { + Map env = System.getenv(); + Field field = env.getClass().getDeclaredField("m"); + field.setAccessible(true); + return (Map) field.get(env); + } + + @BeforeEach + public void before() { + System.setProperty("CustomRuntimeEventLoop.continue", "true"); + } + + @Test + @DirtiesContext + public void testDefaultFunctionLookup() throws Exception { + this.getEnvironment().put("AWS_LAMBDA_RUNTIME_API", "localhost:" + port); + + configuration.inputQueue.clear(); + configuration.inputQueue.addAll(Arrays.asList("\"ricky\"", "\"julien\"", "\"bubbles\"")); + + try (ConfigurableApplicationContext userContext = new SpringApplicationBuilder(SingleFunctionConfiguration.class) + .web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true")) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + CustomRuntimeEventLoop.eventLoop(userContext); + }); + + executor.shutdown(); + assertThat(executor.awaitTermination(2000, TimeUnit.MILLISECONDS)).isTrue(); + + assertThat(configuration.output).size().isEqualTo(3); + assertThat(configuration.output.get(0)).isEqualTo("\"RICKY\""); + assertThat(configuration.output.get(1)).isEqualTo("\"JULIEN\""); + assertThat(configuration.output.get(2)).isEqualTo("\"BUBBLES\""); + } + } + + @Test + @DirtiesContext + public void testDefaultFunctionAsComponentLookup() throws Exception { + this.getEnvironment().put("AWS_LAMBDA_RUNTIME_API", "localhost:" + port); + + configuration.inputQueue.clear(); + configuration.inputQueue.addAll(Arrays.asList("\"ricky\"", "\"julien\"", "\"bubbles\"")); + + try (ConfigurableApplicationContext userContext = new SpringApplicationBuilder(PersonFunction.class) + .web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true")) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + CustomRuntimeEventLoop.eventLoop(userContext); + }); + + executor.shutdown(); + assertThat(executor.awaitTermination(2000, TimeUnit.MILLISECONDS)).isTrue(); + + assertThat(configuration.output).size().isEqualTo(3); + assertThat(configuration.output.get(0)).isEqualTo("{\"name\":\"RICKY\"}"); + assertThat(configuration.output.get(1)).isEqualTo("{\"name\":\"JULIEN\"}"); + assertThat(configuration.output.get(2)).isEqualTo("{\"name\":\"BUBBLES\"}"); + } + } + + @Test + @DirtiesContext + public void test_HANDLERlookupAndPojoFunction() throws Exception { + this.getEnvironment().put("AWS_LAMBDA_RUNTIME_API", "localhost:" + port); + this.getEnvironment().put("_HANDLER", "uppercasePerson"); + + configuration.inputQueue.clear(); + configuration.inputQueue.addAll(Arrays.asList("{\"name\":\"ricky\"}", + "{\"name\":\"julien\"}", "{\"name\":\"bubbles\"}")); + try (ConfigurableApplicationContext userContext = new SpringApplicationBuilder(MultipleFunctionConfiguration.class) + .web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true")) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + CustomRuntimeEventLoop.eventLoop(userContext); + }); + + executor.shutdown(); + assertThat(executor.awaitTermination(2000, TimeUnit.MILLISECONDS)).isTrue(); + + assertThat(configuration.output).size().isEqualTo(3); + assertThat(configuration.output.get(0)).isEqualTo("{\"name\":\"RICKY\"}"); + assertThat(configuration.output.get(1)).isEqualTo("{\"name\":\"JULIEN\"}"); + assertThat(configuration.output.get(2)).isEqualTo("{\"name\":\"BUBBLES\"}"); + } + } + + @Test + @DirtiesContext + public void test_definitionLookupAndComposition() throws Exception { + this.getEnvironment().put("AWS_LAMBDA_RUNTIME_API", "localhost:" + port); + System.setProperty("spring.cloud.function.definition", "toPersonJson|uppercasePerson"); + + configuration.inputQueue.clear(); + configuration.inputQueue.addAll(Arrays.asList("\"ricky\"", "\"julien\"", "\"bubbles\"")); + + try (ConfigurableApplicationContext userContext = new SpringApplicationBuilder(MultipleFunctionConfiguration.class) + .web(WebApplicationType.NONE).run( + "--logging.level.org.springframework.cloud.function=DEBUG", + "--spring.main.lazy-initialization=true")) { + ExecutorService executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + CustomRuntimeEventLoop.eventLoop(userContext); + }); + + executor.shutdown(); + assertThat(executor.awaitTermination(2000, TimeUnit.MILLISECONDS)).isTrue(); + + assertThat(configuration.output).size().isEqualTo(3); + assertThat(configuration.output.get(0)).isEqualTo("{\"name\":\"RICKY\"}"); + assertThat(configuration.output.get(1)).isEqualTo("{\"name\":\"JULIEN\"}"); + assertThat(configuration.output.get(2)).isEqualTo("{\"name\":\"BUBBLES\"}"); + } + } + + @SpringBootConfiguration(proxyBeanMethods = false) + @EnableAutoConfiguration + protected static class CustomRuntimeEmulatorConfiguration { + + BlockingQueue inputQueue = new ArrayBlockingQueue<>(3); + + List output = new ArrayList<>(); + + @Bean("2018-06-01/runtime/invocation/consume/response") + public Consumer> consume() { + return v -> output.add(v.getPayload()); + } + + @Bean("2018-06-01/runtime/invocation/next") + public Supplier> supply() { + + return () -> { + try { + String value = inputQueue.poll(Long.MAX_VALUE, TimeUnit.SECONDS); + if (inputQueue.peek() == null) { + System.setProperty("CustomRuntimeEventLoop.continue", "false"); + } + return MessageBuilder.withPayload(value) + .setHeader("Lambda-Runtime-Aws-Request-Id", "consume") + .setHeader("Content-Type", + MimeTypeUtils.APPLICATION_JSON) + .build(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } + }; + } + } + + @EnableAutoConfiguration + @Configuration + protected static class SingleFunctionConfiguration { + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + } + + @EnableAutoConfiguration + @Configuration + protected static class MultipleFunctionConfiguration { + @Bean + public Function uppercase() { + return v -> v.toUpperCase(); + } + + @Bean + public Function toPersonJson() { + return v -> "{\"name\":\"" + v + "\"}"; + } + + @Bean + public Function uppercasePerson() { + return p -> new Person(p.getName().toUpperCase()); + } + } + + @EnableAutoConfiguration + @Component + public static class PersonFunction implements Function { + + @Override + public Person apply(Person input) { + return new Person(input.getName().toUpperCase()); + } + } + + public static class Person { + private String name; + + public Person() { + + } + + public Person(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + +}