diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
index 8f7545709..5d6b09cdb 100644
--- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
+++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
@@ -31,17 +31,6 @@
org.springframework.cloud
spring-cloud-function-context
-
- org.springframework.cloud
- spring-cloud-function-web
- true
-
-
-
- org.springframework
- spring-web
-
org.springframework.boot
spring-boot-starter
@@ -100,6 +89,21 @@
true
provided
+
+
+
+ org.springframework
+ spring-web
+ true
+
+
+
+ org.springframework.cloud
+ spring-cloud-function-web
+ true
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java
index dcab06728..247007b10 100644
--- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java
+++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeEventLoop.java
@@ -16,6 +16,7 @@
package org.springframework.cloud.function.adapter.aws;
+import java.net.SocketException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
@@ -23,16 +24,18 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.springframework.boot.CommandLineRunner;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
-import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.core.env.Environment;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.RequestEntity;
@@ -50,7 +53,7 @@ import org.springframework.web.client.RestTemplate;
* @since 3.1.1
*
*/
-public final class CustomRuntimeEventLoop implements CommandLineRunner {
+public final class CustomRuntimeEventLoop implements SmartLifecycle {
private static Log logger = LogFactory.getLog(CustomRuntimeEventLoop.class);
@@ -60,23 +63,30 @@ public final class CustomRuntimeEventLoop implements CommandLineRunner {
private final ConfigurableApplicationContext applicationContext;
+ private volatile boolean running;
+
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+
public CustomRuntimeEventLoop(ConfigurableApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
- @Override
- public void run(String... args) throws Exception {
- CustomRuntimeEventLoop.eventLoop(this.applicationContext, args);
+ public void run() {
+ this.running = true;
+ this.executor.execute(() -> {
+ eventLoop(this.applicationContext);
+ });
}
@SuppressWarnings("unchecked")
- private static void eventLoop(ApplicationContext context, String... args) {
+ private void eventLoop(ConfigurableApplicationContext context) {
+ Environment environment = context.getEnvironment();
logger.info("Starting spring-cloud-function CustomRuntimeEventLoop");
if (logger.isDebugEnabled()) {
logger.debug("AWS LAMBDA ENVIRONMENT: " + System.getenv());
}
- String runtimeApi = System.getenv("AWS_LAMBDA_RUNTIME_API");
+ String runtimeApi = environment.getProperty("AWS_LAMBDA_RUNTIME_API");
String eventUri = MessageFormat.format(LAMBDA_RUNTIME_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE);
if (logger.isDebugEnabled()) {
logger.debug("Event URI: " + eventUri);
@@ -88,49 +98,61 @@ public final class CustomRuntimeEventLoop implements CommandLineRunner {
ObjectMapper mapper = context.getBean(ObjectMapper.class);
logger.info("Entering event loop");
- while (isContinue()) {
+ while (this.isRunning()) {
logger.debug("Attempting to get new event");
- ResponseEntity response = rest.exchange(requestEntity, String.class);
+ ResponseEntity response = this.pollForData(rest, requestEntity);
+
if (logger.isDebugEnabled()) {
logger.debug("New Event received: " + response);
}
- FunctionInvocationWrapper function = locateFunction(functionCatalog, response.getHeaders().getContentType());
- Message eventMessage = AWSLambdaUtils.generateMessage(response.getBody().getBytes(StandardCharsets.UTF_8),
- fromHttp(response.getHeaders()), function.getInputType(), mapper);
- if (logger.isDebugEnabled()) {
- logger.debug("Event message: " + eventMessage);
- }
+ if (response != null) {
+ FunctionInvocationWrapper function = locateFunction(environment, functionCatalog, response.getHeaders().getContentType());
+ Message eventMessage = AWSLambdaUtils.generateMessage(response.getBody().getBytes(StandardCharsets.UTF_8),
+ fromHttp(response.getHeaders()), function.getInputType(), mapper);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event message: " + eventMessage);
+ }
- String requestId = response.getHeaders().getFirst("Lambda-Runtime-Aws-Request-Id");
- String invocationUrl = MessageFormat
- .format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
+ String requestId = response.getHeaders().getFirst("Lambda-Runtime-Aws-Request-Id");
+ String invocationUrl = MessageFormat
+ .format(LAMBDA_INVOCATION_URL_TEMPLATE, runtimeApi, LAMBDA_VERSION_DATE, requestId);
- Message responseMessage = (Message) function.apply(eventMessage);
+ Message responseMessage = (Message) function.apply(eventMessage);
- if (responseMessage != null && logger.isDebugEnabled()) {
- logger.debug("Reply from function: " + responseMessage);
- }
+ if (responseMessage != null && logger.isDebugEnabled()) {
+ logger.debug("Reply from function: " + responseMessage);
+ }
- byte[] outputBody = AWSLambdaUtils.generateOutput(eventMessage, responseMessage, mapper, function.getOutputType());
- ResponseEntity
+
+
+
+
- org.slf4j
- slf4j-jdk14
+ org.springframework.cloud
+ spring-cloud-function-web
+
-
+
+ org.springframework.boot
+ spring-boot-starter-web
+
+
+
+ org.springframework
+ spring-web
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+
+
org.springframework.boot
spring-boot-starter-test
test
-
- io.projectreactor
- reactor-test
- test
-
-
- org.awaitility
- awaitility
- test
-
-
- org.testcontainers
- testcontainers
- 1.14.3
- test
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/ContainerTests.java b/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/ContainerTests.java
deleted file mode 100644
index 20ec4834b..000000000
--- a/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/ContainerTests.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Copyright 2019-2019 the original author or authors.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.example;
-
-import java.util.concurrent.TimeUnit;
-
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.ToStringConsumer;
-import org.testcontainers.utility.MountableFile;
-
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.client.RestTemplate;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/**
- * @author Dave Syer
- *
- */
-public class ContainerTests {
-
- @Test
- @Disabled
- void test() throws Exception {
- ToStringConsumer consumer = new ToStringConsumer();
- try (@SuppressWarnings("resource")
- GenericContainer> container = new GenericContainer<>("lambci/lambda:provided").withLogConsumer(consumer)
- .withCopyFileToContainer(MountableFile.forClasspathResource("testBootstrap"), "/var/task/bootstrap")
- .withEnv("DOCKER_LAMBDA_STAY_OPEN", "1").withExposedPorts(9001)) {
- container.start();
- int port = container.getFirstMappedPort();
- String host = container.getHost();
- LambdaApplication.main(new String[] { "--AWS_LAMBDA_RUNTIME_API=" + host + ":" + port,
- "--_HANDLER=uppercase", "--logging.level.org.springframework=DEBUG" });
- ResponseEntity response = Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> {
- ResponseEntity result = new RestTemplate().postForEntity(
- "http://" + host + ":" + port + "/2015-03-31/functions/foobar/invocations", "foo",
- String.class);
- return result;
- }, result -> result != null);
- assertThat(response.getBody()).isEqualTo("\"FOO\"");
- assertThat(response.getHeaders()).containsKey("X-Amzn-Requestid");
- }
- String output = consumer.toUtf8String();
- assertThat(output).contains("Lambda API listening on port 9001");
- assertThat(output).contains("START RequestId:");
- assertThat(output).contains("END RequestId:");
- }
-
-}
diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java b/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java
new file mode 100644
index 000000000..b7e560f00
--- /dev/null
+++ b/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2019-2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.example;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.cloud.function.adapter.test.aws.AWSCustomRuntime;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.TestPropertySource;
+
+
+/**
+ * @author Oleg Zhurakousky
+ *
+ */
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = {"spring.main.web-application-type=servlet"})
+@ContextConfiguration(classes = {AWSCustomRuntime.class}, initializers = LambdaApplication.class)
+@TestPropertySource(properties = {"_HANDLER=uppercase"})
+public class LambdaApplicationTests {
+ @Autowired
+ private AWSCustomRuntime aws;
+
+ @Test
+ void testWithCustomRuntime() throws Exception {
+ assertThat(aws.exchange("\"oleg\"").getPayload()).isEqualTo("\"OLEG\"");
+ assertThat(aws.exchange("\"dave\"").getPayload()).isEqualTo("\"DAVE\"");
+ }
+}