From 428243ce489d4df35b9bdfabe402512c5a0f1030 Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Wed, 13 Feb 2019 06:26:12 -0600 Subject: [PATCH] Add new feature to initialize a Supplier from a remote HTTP endpoint Kind of like the SupplierExporter but to create the Supplier itself. With this in place you can define the templateUrl (destination) and the originaUrl (source) and use the app as a pipeline for events from/to HTTP. Provide functional bean support for HTTP export Add autoconfig to AWS adapter for custom runtime Fix HttpSupplier to always supply Message if headers are included Fix registration of origin supplier in functional beans Add docs on new AWS features Add custom runtime sample --- docs/src/main/asciidoc/adapters/aws.adoc | 43 +++++ .../spring-cloud-function-adapter-aws/pom.xml | 5 + .../aws/CustomRuntimeAutoConfiguration.java | 60 ++++++ .../adapter/aws/CustomRuntimeInitializer.java | 47 +++++ .../aws/LambdaDestinationResolver.java | 37 ++++ .../main/resources/META-INF/spring.factories | 4 + .../function-sample-aws-custom/pom.xml | 128 +++++++++++++ .../src/assembly/zip.xml | 35 ++++ .../java/com/example/LambdaApplication.java | 38 ++++ .../src/main/resources/application.properties | 5 + .../src/main/resources/bootstrap | 8 + .../com/example/LambdaApplicationTests.java | 18 ++ spring-cloud-function-samples/pom.xml | 1 + .../web/source/ExporterProperties.java | 173 ++++++++++++++++++ ...=> FunctionExporterAutoConfiguration.java} | 63 +++++-- .../source/FunctionExporterInitializer.java | 103 +++++++++++ .../function/web/source/HttpSupplier.java | 106 +++++++++++ .../web/source/SimpleRequestBuilder.java | 18 +- .../function/web/source/SupplierExporter.java | 35 +++- .../web/source/SupplierProperties.java | 3 +- .../main/resources/META-INF/spring.factories | 5 +- .../test/FunctionalExporterTests.java | 116 ++++++++++++ .../function/test/RestConfiguration.java | 67 +++++++ ...tionAutoConfigurationIntegrationTests.java | 133 ++++++++++++++ ...urceAutoConfigurationIntegrationTests.java | 9 +- .../web/source/WebAppIntegrationTests.java | 21 ++- 26 files changed, 1239 insertions(+), 42 deletions(-) create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeAutoConfiguration.java create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/resources/META-INF/spring.factories create mode 100644 spring-cloud-function-samples/function-sample-aws-custom/pom.xml create mode 100644 spring-cloud-function-samples/function-sample-aws-custom/src/assembly/zip.xml create mode 100644 spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java create mode 100644 spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/application.properties create mode 100755 spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/bootstrap create mode 100644 spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java rename spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/{SupplierAutoConfiguration.java => FunctionExporterAutoConfiguration.java} (56%) create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterInitializer.java create mode 100644 spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java create mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java create mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/RestConfiguration.java create mode 100644 spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java diff --git a/docs/src/main/asciidoc/adapters/aws.adoc b/docs/src/main/asciidoc/adapters/aws.adoc index 328aa58be..b8c797b8b 100644 --- a/docs/src/main/asciidoc/adapters/aws.adoc +++ b/docs/src/main/asciidoc/adapters/aws.adoc @@ -4,6 +4,36 @@ The https://aws.amazon.com/[AWS] adapter takes a Spring Cloud Function app and c include::aws-intro.adoc[] +== Functional Bean Definitions + +Your functions will start much quicker if you can use functional bean definitions instead of `@Bean`. To do this make your main class +an `ApplicationContextInitalizer` and use the `registerBean()` methods in `GenericApplicationContext` to +create all the beans you need. You function need sto be registered as a bean of type `FunctionRegistration` so that the input and +output types can be accessed by the framework. There is an example in github (the AWS sample is written in this style). It would +look something like this: + +```java +@SpringBootApplication +public class FuncApplication implements ApplicationContextInitializer { + + public static void main(String[] args) throws Exception { + FunctionalSpringApplication.run(FuncApplication.class, args); + } + + public Function function() { + return value -> new Bar(value.uppercase())); + } + + @Override + public void initialize(GenericApplicationContext context) { + context.registerBean("function", FunctionRegistration.class, + () -> new FunctionRegistration>(function()) + .type(FunctionType.from(Foo.class).to(Bar.class).getType())); + } + +} +``` + == Platfom Specific Features === HTTP and API Gateway @@ -21,3 +51,16 @@ The supported AWS services and generic handler types are listed below: For example, to deploy behind an API Gateway, use `--handler org.springframework.cloud.function.adapter.aws.SpringBootApiGatewayRequestHandler` in your AWS command line (in via the UI) and define a `@Bean` of type `Function,Message>` where `Foo` and `Bar` are POJO types (the data will be marshalled and unmarshalled by AWS using Jackson). + +== Custom Runtime + +An https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html[AWS Lambda custom runtime] can be created really easily using the HTTP export features in Spring Cloud Function Web. To make this work just add Spring Cloud Function AWS and Spring Cloud Function Web as dependencies in your project and set the following in your `application.properties`: + +``` +spring.cloud.function.web.export.enabled=true +spring.cloud.function.web.export.source.url=http://${AWS_LAMBDA_RUNTIME_API:localhost}/2018-06-01/runtime/invocation/next +spring.cloud.function.web.export.sink.url=http://${AWS_LAMBDA_RUNTIME_API:localhost}/2018-06-01/runtime/invocation/{{destination}}/response +spring.cloud.function.web.export.sink.name=origin|uppercase +``` + +where "uppercase" is the name of your function ("origin" is the name of the `Supplier` that is provided by Spring Cloud Function Web). Then provide a `bootstrap` script in the root of your zip/jar that runs the Spring Boot application. The functional bean definition style works for custom runtimes too, and is faster than the `@Bean` style, so the example `FuncApplication` above would work. A custom runtime can start up much quicker even than a functional bean implementation of a Java lambda - it depends mostly on the number of classes you need to load at runtime. Spring doesn't do very much here, so you can reduce the cold start time by only using primitive types in your function, for instance, and not doing any work in custom `@PostConstruct` initializers. \ No newline at end of file diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml index 9224fa2e8..7089ece44 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml @@ -31,6 +31,11 @@ org.springframework.cloud spring-cloud-function-context + + org.springframework.cloud + spring-cloud-function-web + true + org.springframework diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeAutoConfiguration.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeAutoConfiguration.java new file mode 100644 index 000000000..fd0ea1d8a --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeAutoConfiguration.java @@ -0,0 +1,60 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.cloud.function.web.source.DestinationResolver; +import org.springframework.cloud.function.web.source.FunctionExporterAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @author Dave Syer + */ +@Configuration +@AutoConfigureBefore(FunctionExporterAutoConfiguration.class) +public class CustomRuntimeAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + public DestinationResolver destinationResolver() { + return new LambdaDestinationResolver(); + } + + @Bean + public CommandLineRunner backgrounder() { + return args -> background(); + } + + static void background() { + Thread thread = new Thread(() -> { + System.out.println("Started"); + while (true) { + try { + Thread.sleep(500L); + } + catch (InterruptedException e) { + } + } + }); + thread.setDaemon(false); + thread.start(); + } + +} diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java new file mode 100644 index 000000000..c9c33dda7 --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/CustomRuntimeInitializer.java @@ -0,0 +1,47 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import org.springframework.boot.CommandLineRunner; +import org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer; +import org.springframework.cloud.function.web.source.DestinationResolver; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.core.annotation.Order; + +/** + * @author Dave Syer + */ +@Order(0) +public class CustomRuntimeInitializer + implements ApplicationContextInitializer { + + @Override + public void initialize(GenericApplicationContext context) { + if (ContextFunctionCatalogInitializer.enabled && context.getEnvironment() + .getProperty("spring.functional.enabled", Boolean.class, false)) { + if (context.getBeanFactory().getBeanNamesForType(DestinationResolver.class, + false, false).length == 0) { + context.registerBean(LambdaDestinationResolver.class, + () -> new LambdaDestinationResolver()); + } + context.registerBean(CommandLineRunner.class, + () -> args -> CustomRuntimeAutoConfiguration.background()); + } + } + +} diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java new file mode 100644 index 000000000..8ce974ea5 --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/LambdaDestinationResolver.java @@ -0,0 +1,37 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import java.util.function.Supplier; + +import org.springframework.cloud.function.web.source.DestinationResolver; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; + +public class LambdaDestinationResolver implements DestinationResolver { + + @Override + public String destination(Supplier supplier, String name, Object value) { + Message message = (Message) value; + MessageHeaders headers = message.getHeaders(); + if (headers.containsKey("lambda-runtime-aws-request-id")) { + return (String) headers.get("lambda-runtime-aws-request-id"); + } + return "unknown"; + } + +} diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/resources/META-INF/spring.factories b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..207958412 --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/resources/META-INF/spring.factories @@ -0,0 +1,4 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ +org.springframework.cloud.function.adapter.aws.CustomRuntimeAutoConfiguration +org.springframework.context.ApplicationContextInitializer=\ +org.springframework.cloud.function.adapter.aws.CustomRuntimeInitializer diff --git a/spring-cloud-function-samples/function-sample-aws-custom/pom.xml b/spring-cloud-function-samples/function-sample-aws-custom/pom.xml new file mode 100644 index 000000000..5c508ab81 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.1.2.RELEASE + + + io.spring.sample + function-sample-aws-custom + 0.0.1-SNAPSHOT + lambda + Demo project for Spring Cloud Function with custom AWS Lambda runtime + + + 1.8 + 1.0.21.RELEASE + + + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-logging + + + org.springframework.boot + spring-boot-starter-json + + + io.netty + netty-codec-http2 + + + io.netty + netty-transport-native-epoll + + + org.hibernate.validator + hibernate-validator + + + org.synchronoss.cloud + nio-multipart-parser + + + + + org.springframework.cloud + spring-cloud-function-web + + + org.springframework.cloud + spring-cloud-function-adapter-aws + + + org.slf4j + slf4j-jdk14 + + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.cloud + spring-cloud-function-dependencies + 2.1.0.BUILD-SNAPSHOT + pom + import + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + org.springframework.boot.experimental + spring-boot-thin-layout + ${wrapper.version} + + + + + maven-assembly-plugin + + + zip + package + + single + + false + + + + + + com.sample.App + + + + src/assembly/zip.xml + + + + + + + diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/assembly/zip.xml b/spring-cloud-function-samples/function-sample-aws-custom/src/assembly/zip.xml new file mode 100644 index 000000000..544483aec --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/assembly/zip.xml @@ -0,0 +1,35 @@ + + zip + + zip + + + + + target/classes + / + true + + bootstrap + + + + target/classes + / + true + 0775 + + bootstrap + + + + + + /lib + false + runtime + + + \ No newline at end of file diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java b/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java new file mode 100644 index 000000000..730053793 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/main/java/com/example/LambdaApplication.java @@ -0,0 +1,38 @@ +package com.example; + +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.context.FunctionalSpringApplication; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; + +@SpringBootApplication +public class LambdaApplication + implements ApplicationContextInitializer { + + private static Log logger = LogFactory.getLog(LambdaApplication.class); + + public Function uppercase() { + return value -> { + logger.info("Processing: " + value); + return value.toUpperCase(); + }; + } + + public static void main(String[] args) { + FunctionalSpringApplication.run(LambdaApplication.class, args); + } + + @Override + public void initialize(GenericApplicationContext context) { + context.registerBean("uppercase", FunctionRegistration.class, + () -> new FunctionRegistration<>(uppercase()).type( + FunctionType.from(String.class).to(String.class))); + } +} diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/application.properties b/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/application.properties new file mode 100644 index 000000000..a87f94089 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/application.properties @@ -0,0 +1,5 @@ +spring.cloud.function.web.export.enabled=true +spring.cloud.function.web.export.source.url=http://${AWS_LAMBDA_RUNTIME_API:localhost}/2018-06-01/runtime/invocation/next +spring.cloud.function.web.export.sink.url=http://${AWS_LAMBDA_RUNTIME_API:localhost}/2018-06-01/runtime/invocation/{{destination}}/response +spring.cloud.function.web.export.sink.name=origin|uppercase +# spring.cloud.function.web.supplier.debug=true diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/bootstrap b/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/bootstrap new file mode 100755 index 000000000..ab39ddb01 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/main/resources/bootstrap @@ -0,0 +1,8 @@ +#!/bin/sh + +cd ${LAMBDA_TASK_ROOT:-.} + +java -Dspring.main.web-application-type=none -Dspring.jmx.enabled=false \ + -noverify -XX:TieredStopAtLevel=1 -Xss256K -XX:MaxMetaspaceSize=128M \ + -Djava.security.egd=file:/dev/./urandom \ + -cp .:`echo lib/*.jar | tr ' ' :` com.example.LambdaApplication diff --git a/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java b/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java new file mode 100644 index 000000000..eacfa6ef8 --- /dev/null +++ b/spring-cloud-function-samples/function-sample-aws-custom/src/test/java/com/example/LambdaApplicationTests.java @@ -0,0 +1,18 @@ +package com.example; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.cloud.function.context.test.FunctionalSpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@FunctionalSpringBootTest +public class LambdaApplicationTests { + + @Test + public void contextLoads() { + } + +} + diff --git a/spring-cloud-function-samples/pom.xml b/spring-cloud-function-samples/pom.xml index 706891e49..0243b118e 100644 --- a/spring-cloud-function-samples/pom.xml +++ b/spring-cloud-function-samples/pom.xml @@ -21,6 +21,7 @@ function-sample-compiler function-sample-task function-sample-aws + function-sample-aws-custom function-sample-azure function-sample-spring-integration diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java new file mode 100644 index 000000000..92111d985 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/ExporterProperties.java @@ -0,0 +1,173 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.source; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author Dave Syer + * + */ +@ConfigurationProperties("spring.cloud.function.web.export") +public class ExporterProperties { + + /** + * Flag to indicate that the supplier emits HTTP requests automatically on startup. + */ + private boolean autoStartup = true; + + /** + * Flag to indicate that extra logging is required for the supplier. + */ + private boolean debug = true; + + /** + * Properties related to a source of items (via an HTTP GET on startup). + */ + private Source source = new Source(); + + /** + * Properties related to a sink of items (via an HTTP POST on startup). + */ + private Sink sink = new Sink(); + + /** + * Flag to enable the export of a supplier. + */ + private boolean enabled; + + public boolean isEnabled() { + return this.enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public boolean isAutoStartup() { + return this.autoStartup; + } + + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + public boolean isDebug() { + return this.debug; + } + + public void setDebug(boolean debug) { + this.debug = debug; + } + + public Source getSource() { + return this.source; + } + + public Sink getSink() { + return this.sink; + } + + public static class Source { + + /** + * URL template for creating a virtual Supplier from HTTP GET. + */ + private String url; + + /** + * If the origin url is set, the type of content expected (e.g. a POJO class). + * Defaults to String. + */ + private Class type; + + /** + * Include the incoming headers in the outgoing Supplier. If true the supplier + * will be of generic type Message of T equal to the source type. + */ + private boolean includeHeaders = true; + + public String getUrl() { + return this.url; + } + + public void setUrl(String url) { + this.url = url; + } + + public Class getType() { + return this.type == null ? String.class : this.type; + } + + public void setType(Class type) { + this.type = type; + } + + public void setIncludeHeaders(boolean includeHeaders) { + this.includeHeaders = includeHeaders; + } + + public boolean isIncludeHeaders() { + return this.includeHeaders; + } + + } + + public static class Sink { + + /** + * URL template for outgoing HTTP requests. Each item from the supplier is POSTed + * to this target. + */ + private String url; + + /** + * Additional headers to append to the outgoing HTTP requests. + */ + private Map headers = new LinkedHashMap<>(); + + /** + * The name of a specific existing Supplier to export from the function catalog. + */ + private String name; + + public String getName() { + return this.name; + } + + public void setName(String name) { + this.name = name; + } + + public String getUrl() { + return this.url; + } + + public void setUrl(String url) { + this.url = url; + } + + public Map getHeaders() { + return this.headers; + } + + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierAutoConfiguration.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterAutoConfiguration.java similarity index 56% rename from spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierAutoConfiguration.java rename to spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterAutoConfiguration.java index 6657cf027..524cf6c7d 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierAutoConfiguration.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterAutoConfiguration.java @@ -16,6 +16,11 @@ package org.springframework.cloud.function.web.source; +import java.util.function.Supplier; + +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.AnyNestedCondition; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; @@ -23,7 +28,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnNotWebAppli import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.function.context.FunctionCatalog; -import org.springframework.cloud.function.web.source.SupplierAutoConfiguration.SourceActiveCondition; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.web.source.FunctionExporterAutoConfiguration.SourceActiveCondition; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; import org.springframework.context.annotation.Configuration; @@ -37,26 +44,48 @@ import org.springframework.web.reactive.function.client.WebClient; @Configuration @ConditionalOnClass(WebClient.class) @Conditional(SourceActiveCondition.class) -@EnableConfigurationProperties(SupplierProperties.class) -@ConditionalOnProperty(prefix = "spring.cloud.function.web.supplier", name = "enabled", matchIfMissing = true) -class SupplierAutoConfiguration { +@EnableConfigurationProperties(ExporterProperties.class) +@ConditionalOnProperty(prefix = "spring.cloud.function.web.export", name = "enabled", matchIfMissing = true) +public class FunctionExporterAutoConfiguration { - @Bean - public SupplierExporter sourceForwarder(RequestBuilder requestBuilder, - DestinationResolver destinationResolver, FunctionCatalog catalog, - WebClient.Builder builder, SupplierProperties props) { - return new SupplierExporter(requestBuilder, destinationResolver, catalog, - builder.build(), props); + private ExporterProperties props; + + @Autowired + FunctionExporterAutoConfiguration(ExporterProperties props) { + this.props = props; } @Bean - public RequestBuilder simpleRequestBuilder(SupplierProperties props, - Environment environment) { - SimpleRequestBuilder builder = new SimpleRequestBuilder(environment); - if (props.getTemplateUrl() != null) { - builder.setTemplateUrl(props.getTemplateUrl()); + @ConditionalOnProperty(prefix = "spring.cloud.function.web.export.sink", name = "url") + public SupplierExporter sourceForwarder(RequestBuilder requestBuilder, + DestinationResolver destinationResolver, FunctionCatalog catalog, + WebClient.Builder builder) { + return new SupplierExporter(requestBuilder, destinationResolver, catalog, + builder.build(), this.props); + } + + @Bean + @ConditionalOnProperty(prefix = "spring.cloud.function.web.export.source", name = "url") + public FunctionRegistration>> origin(WebClient.Builder builder) { + HttpSupplier supplier = new HttpSupplier(builder.build(), this.props); + FunctionRegistration>> registration = new FunctionRegistration<>( + supplier); + FunctionType type = FunctionType.supplier(this.props.getSource().getType()) + .wrap(Flux.class); + if (this.props.getSource().isIncludeHeaders()) { + type = type.message(); } - builder.setHeaders(props.getHeaders()); + registration = registration.type(type); + return registration; + } + + @Bean + public RequestBuilder simpleRequestBuilder(Environment environment) { + SimpleRequestBuilder builder = new SimpleRequestBuilder(environment); + if (this.props.getSink().getUrl() != null) { + builder.setTemplateUrl(this.props.getSink().getUrl()); + } + builder.setHeaders(this.props.getSink().getHeaders()); return builder; } @@ -77,7 +106,7 @@ class SupplierAutoConfiguration { } - @ConditionalOnProperty(prefix = "spring.cloud.function.web.supplier", name = "enabled") + @ConditionalOnProperty(prefix = "spring.cloud.function.web.export", name = "enabled") static class Enabled { } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterInitializer.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterInitializer.java new file mode 100644 index 000000000..f5df7712a --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/FunctionExporterInitializer.java @@ -0,0 +1,103 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.source; + +import org.springframework.boot.web.reactive.context.ReactiveWebApplicationContext; +import org.springframework.cloud.function.context.FunctionCatalog; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.config.ContextFunctionCatalogInitializer; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.util.ClassUtils; +import org.springframework.web.context.WebApplicationContext; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * @author Dave Syer + * @since 2.0 + * + */ +class FunctionExporterInitializer + implements ApplicationContextInitializer { + + @Override + public void initialize(GenericApplicationContext context) { + if (ContextFunctionCatalogInitializer.enabled && context.getEnvironment() + .getProperty("spring.functional.enabled", Boolean.class, false) + && isExporting(context)) { + registerWebClient(context); + registerExport(context); + } + } + + private void registerWebClient(GenericApplicationContext context) { + if (context.getBeanFactory().getBeanNamesForType(WebClient.Builder.class, false, + false).length == 0) { + context.registerBean(WebClient.Builder.class, () -> WebClient.builder()); + } + } + + private boolean isExporting(GenericApplicationContext context) { + if (context.getEnvironment().getProperty("spring.cloud.function.web.export", + Boolean.class, false)) { + return true; + } + if (ClassUtils.isPresent("org.springframework.web.context.WebApplicationContext", + getClass().getClassLoader())) { + if (context instanceof WebApplicationContext + || context instanceof ReactiveWebApplicationContext) { + return false; + } + } + return true; + } + + private void registerExport(GenericApplicationContext context) { + context.registerBean(ExporterProperties.class, () -> new ExporterProperties()); + context.registerBean(FunctionExporterAutoConfiguration.class, + () -> new FunctionExporterAutoConfiguration( + context.getBean(ExporterProperties.class))); + if (context.getBeanFactory().getBeanNamesForType(DestinationResolver.class, false, + false).length == 0) { + context.registerBean(DestinationResolver.class, + () -> context.getBean(FunctionExporterAutoConfiguration.class) + .simpleDestinationResolver()); + } + if (context.getBeanFactory().getBeanNamesForType(RequestBuilder.class, false, + false).length == 0) { + context.registerBean(RequestBuilder.class, + () -> context.getBean(FunctionExporterAutoConfiguration.class) + .simpleRequestBuilder(context.getEnvironment())); + } + if (context.getEnvironment() + .getProperty("spring.cloud.function.web.export.source.url") != null) { + context.registerBean("origin", FunctionRegistration.class, + () -> context.getBean(FunctionExporterAutoConfiguration.class) + .origin(context.getBean(WebClient.Builder.class))); + } + if (context.getEnvironment() + .getProperty("spring.cloud.function.web.export.sink.url") != null) { + context.registerBean(SupplierExporter.class, + () -> context.getBean(FunctionExporterAutoConfiguration.class) + .sourceForwarder(context.getBean(RequestBuilder.class), + context.getBean(DestinationResolver.class), + context.getBean(FunctionCatalog.class), + context.getBean(WebClient.Builder.class))); + } + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java new file mode 100644 index 000000000..9c1bf0d67 --- /dev/null +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java @@ -0,0 +1,106 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.source; + +import java.util.function.Supplier; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.cloud.function.web.util.HeaderUtils; +import org.springframework.http.HttpStatus; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * A {@link Supplier} that pulls data from an HTTP endpoint. Repeatedly polls the endpoint + * until a non-2xx response is received. + * + * @author Dave Syer + */ +public class HttpSupplier implements Supplier> { + + private static Log logger = LogFactory.getLog(HttpSupplier.class); + + private WebClient client; + + private ExporterProperties props; + + /** + * @param client the WebClient to use. The baseUrl should be set. + * @param props the ExporterProperties to use to parameterize the requests. + */ + public HttpSupplier(WebClient client, ExporterProperties props) { + this.client = client; + this.props = props; + } + + @Override + public Flux get() { + return get(this.client); + } + + private Flux get(WebClient client) { + Flux result = client.get().uri(this.props.getSource().getUrl()).exchange() + .flatMap(this::transform).repeat(); + if (this.props.isDebug()) { + result = result.log(); + } + return result.onErrorResume(TerminateException.class, error -> Mono.empty()); + } + + private Mono transform(ClientResponse response) { + HttpStatus status = response.statusCode(); + if (!status.is2xxSuccessful()) { + if (this.props.isDebug()) { + logger.info("Terminated supplier with status=" + response.statusCode()); + } + return Mono.error(TerminateException.INSTANCE); + } + return response.bodyToMono(this.props.getSource().getType()) + .map(value -> message(response, value)); + } + + private Object message(ClientResponse response, Object payload) { + if (!this.props.getSource().isIncludeHeaders()) { + return payload; + } + return MessageBuilder.withPayload(payload) + .copyHeaders(HeaderUtils.fromHttp( + HeaderUtils.sanitize(response.headers().asHttpHeaders()))) + .build(); + } + + private static class TerminateException extends RuntimeException { + + static final TerminateException INSTANCE = new TerminateException(); + + TerminateException() { + super("Planned termination"); + } + + @Override + public synchronized Throwable fillInStackTrace() { + return this; + } + + } + +} diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SimpleRequestBuilder.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SimpleRequestBuilder.java index b2e4962a5..ff220c328 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SimpleRequestBuilder.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SimpleRequestBuilder.java @@ -18,11 +18,15 @@ package org.springframework.cloud.function.web.source; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import org.springframework.cloud.function.web.util.HeaderUtils; import org.springframework.core.env.Environment; import org.springframework.http.HttpHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; /** * @author Dave Syer @@ -42,13 +46,17 @@ class SimpleRequestBuilder implements RequestBuilder { @Override public HttpHeaders headers(String destination, Object value) { - // TODO: add message headers if any - HttpHeaders result = new HttpHeaders(); + MessageHeaders incoming = new MessageHeaders(Collections.emptyMap()); + if (value instanceof Message) { + Message message = (Message) value; + incoming = message.getHeaders(); + } + HttpHeaders result = HeaderUtils.fromMessage(incoming); for (String key : this.headers.keySet()) { String header = this.headers.get(key); header = header.replace("${destination}", destination); header = this.environment.resolvePlaceholders(header); - result.add(key, header); + result.set(key, header); } return result; } @@ -56,8 +64,8 @@ class SimpleRequestBuilder implements RequestBuilder { @Override public URI uri(String destination) { try { - return new URI(this.environment.resolvePlaceholders( - this.baseUrl.replace("${destination}", destination))); + return new URI(this.baseUrl.replace("${destination}", destination) + .replace("{{destination}}", destination)); } catch (URISyntaxException e) { throw new IllegalStateException("Cannot create URI", e); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java index 13676b3e7..93399782e 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java @@ -21,15 +21,16 @@ import java.util.Collections; import java.util.Set; import java.util.function.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.SmartLifecycle; import org.springframework.http.HttpHeaders; -import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.messaging.Message; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; @@ -40,7 +41,9 @@ import org.springframework.web.reactive.function.client.WebClient; * @author Dave Syer * */ -class SupplierExporter implements SmartLifecycle { +public class SupplierExporter implements SmartLifecycle { + + private static Log logger = LogFactory.getLog(SupplierExporter.class); private final FunctionCatalog catalog; @@ -64,14 +67,14 @@ class SupplierExporter implements SmartLifecycle { SupplierExporter(RequestBuilder requestBuilder, DestinationResolver destinationResolver, FunctionCatalog catalog, - WebClient client, SupplierProperties props) { + WebClient client, ExporterProperties props) { this.requestBuilder = requestBuilder; this.destinationResolver = destinationResolver; this.catalog = catalog; this.client = client; this.debug = props.isDebug(); this.autoStartup = props.isAutoStartup(); - this.supplier = props.getName(); + this.supplier = props.getSink().getName(); } @Override @@ -82,19 +85,24 @@ class SupplierExporter implements SmartLifecycle { this.running = true; this.ok = true; + logger.info("Starting"); Flux streams = Flux.empty(); Set names = this.supplier == null ? this.catalog.getNames(Supplier.class) : Collections.singleton(this.supplier); for (String name : names) { Supplier> supplier = this.catalog.lookup(Supplier.class, name); + if (supplier == null) { + logger.warn("No such Supplier: " + name); + continue; + } streams = streams.mergeWith(forward(supplier, name)); } this.subscription = streams.doOnError(error -> { this.ok = false; if (!this.debug) { - error.printStackTrace(); + logger.info(error); } }).doOnTerminate(() -> this.running = false).doOnNext(value -> { if (this.subscription != null && !this.running) { @@ -104,17 +112,25 @@ class SupplierExporter implements SmartLifecycle { } private Flux forward(Supplier> supplier, String name) { - return supplier.get().publishOn(Schedulers.parallel()).flatMap(value -> { + return supplier.get().flatMap(value -> { String destination = this.destinationResolver.destination(supplier, name, value); + if (this.debug) { + logger.info("Posting to: " + destination); + } return post(uri(destination), destination, value); }); } private Mono post(URI uri, String destination, Object value) { + Object body = value; + if (value instanceof Message) { + Message message = (Message) value; + body = message.getPayload(); + } Mono result = this.client.post().uri(uri) - .headers(headers -> headers(headers, destination, value)) - .body(BodyInserters.fromObject(value)).exchange(); + .headers(headers -> headers(headers, destination, value)).syncBody(body) + .exchange(); if (this.debug) { result = result.log(); } @@ -135,6 +151,7 @@ class SupplierExporter implements SmartLifecycle { @Override public void stop() { + logger.info("Stopping"); this.running = false; } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierProperties.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierProperties.java index 10ccfd1a4..71d97db8f 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierProperties.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierProperties.java @@ -23,9 +23,10 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** * @author Dave Syer - * + * @deprecated in favour of {@link ExporterProperties} */ @ConfigurationProperties("spring.cloud.function.web.supplier") +@Deprecated public class SupplierProperties { private boolean autoStartup = true; diff --git a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories index bf62195c4..62f7b592b 100644 --- a/spring-cloud-function-web/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-function-web/src/main/resources/META-INF/spring.factories @@ -1,9 +1,10 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\ org.springframework.cloud.function.web.mvc.ReactorAutoConfiguration,\ -org.springframework.cloud.function.web.source.SupplierAutoConfiguration +org.springframework.cloud.function.web.source.FunctionExporterAutoConfiguration org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc=\ org.springframework.cloud.function.web.flux.ReactorAutoConfiguration,\ org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration org.springframework.context.ApplicationContextInitializer=\ -org.springframework.cloud.function.web.function.FunctionEndpointInitializer +org.springframework.cloud.function.web.function.FunctionEndpointInitializer,\ +org.springframework.cloud.function.web.source.FunctionExporterInitializer diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java new file mode 100644 index 000000000..8e27baae7 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java @@ -0,0 +1,116 @@ +/* + * Copyright 2012-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.test; + +import java.util.function.Function; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.cloud.function.context.FunctionRegistration; +import org.springframework.cloud.function.context.FunctionType; +import org.springframework.cloud.function.context.test.FunctionalSpringBootTest; +import org.springframework.cloud.function.test.FunctionalExporterTests.ApplicationConfiguration; +import org.springframework.cloud.function.web.source.SupplierExporter; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.SocketUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@FunctionalSpringBootTest(classes = ApplicationConfiguration.class, webEnvironment = WebEnvironment.NONE, properties = { + "spring.main.web-application-type=none", + "spring.cloud.function.web.export.sink.url=http://localhost:${my.port}", + "spring.cloud.function.web.export.source.url=http://localhost:${my.port}", + "spring.cloud.function.web.export.sink.name=origin|uppercase", + "spring.cloud.function.web.export.debug=true", + "spring.cloud.function.web.export.enabled=true" }) +public class FunctionalExporterTests { + + @Autowired + private SupplierExporter forwarder; + + private static RestConfiguration app; + + private static ConfigurableApplicationContext context; + + @BeforeClass + public static void init() throws Exception { + String port = "" + SocketUtils.findAvailableTcpPort(); + System.setProperty("server.port", port); + System.setProperty("my.port", port); + context = SpringApplication.run(RestConfiguration.class, + "--spring.main.web-application-type=reactive"); + app = context.getBean(RestConfiguration.class); + // Sometimes the server doesn't start quick enough + Thread.sleep(500L); + } + + @AfterClass + public static void close() { + System.clearProperty("server.port"); + if (context != null) { + context.close(); + } + } + + @Test + public void words() throws Exception { + int count = 0; + while (this.forwarder.isRunning() && count++ < 1000) { + Thread.sleep(20); + } + // It completed + assertThat(this.forwarder.isRunning()).isFalse(); + assertThat(FunctionalExporterTests.app.inputs).contains("HELLO"); + assertThat(this.forwarder.isOk()).isTrue(); + } + + @SpringBootConfiguration + protected static class ApplicationConfiguration + implements ApplicationContextInitializer { + + Function, Message> uppercase() { + return value -> MessageBuilder.withPayload(value.getPayload().toUpperCase()) + .copyHeaders(value.getHeaders()).build(); + } + + @Override + public void initialize(GenericApplicationContext context) { + context.registerBean("uppercase", FunctionRegistration.class, + () -> new FunctionRegistration<>(uppercase()).type( + FunctionType.from(String.class).to(String.class).message())); + } + + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/RestConfiguration.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/RestConfiguration.java new file mode 100644 index 000000000..dd57e83d1 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/RestConfiguration.java @@ -0,0 +1,67 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +@SpringBootConfiguration +@EnableAutoConfiguration +@RestController +public class RestConfiguration { + + private static Log logger = LogFactory.getLog(RestConfiguration.class); + + List inputs = new ArrayList<>(); + + private Iterator outputs = Arrays.asList("hello", "world").iterator(); + + @GetMapping("/") + ResponseEntity home() { + logger.info("HOME"); + if (this.outputs.hasNext()) { + return ResponseEntity.ok(this.outputs.next()); + } + return ResponseEntity.notFound().build(); + } + + @PostMapping("/") + ResponseEntity accept(@RequestBody String body) { + logger.info("ACCEPT"); + this.inputs.add(body); + return ResponseEntity.accepted().body(body); + } + + public static void main(String[] args) throws Exception { + SpringApplication.run(RestConfiguration.class, args); + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java new file mode 100644 index 000000000..4d14b1ecd --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java @@ -0,0 +1,133 @@ +/* + * Copyright 2018-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.source; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.cloud.function.web.source.FunctionAutoConfigurationIntegrationTests.ApplicationConfiguration; +import org.springframework.cloud.function.web.source.FunctionAutoConfigurationIntegrationTests.RestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.SocketUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Dave Syer + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT, properties = { + "spring.cloud.function.web.export.sink.url=http://localhost:${server.port}", + "spring.cloud.function.web.export.source.url=http://localhost:${server.port}", + "spring.cloud.function.web.export.sink.name=origin|uppercase", + "spring.cloud.function.web.export.debug=true", + "spring.cloud.function.web.export.enabled=true" }) +@ContextConfiguration(classes = { RestConfiguration.class, + ApplicationConfiguration.class }) +public class FunctionAutoConfigurationIntegrationTests { + + @Autowired + private SupplierExporter forwarder; + + @Autowired + private RestConfiguration app; + + @BeforeClass + public static void init() { + System.setProperty("server.port", "" + SocketUtils.findAvailableTcpPort()); + } + + @AfterClass + public static void close() { + System.clearProperty("server.port"); + } + + @Test + public void copiesMessages() throws Exception { + int count = 0; + while (this.forwarder.isRunning() && count++ < 100) { + Thread.sleep(20); + } + // It completed + assertThat(this.forwarder.isRunning()).isFalse(); + assertThat(this.forwarder.isOk()).isTrue(); + assertThat(this.app.inputs).contains("HELLO"); + assertThat(this.app.inputs).contains("WORLD"); + } + + @EnableAutoConfiguration + @TestConfiguration + public static class ApplicationConfiguration { + + @Bean + public Function uppercase() { + return value -> value.toUpperCase(); + } + + } + + @TestConfiguration + @RestController + public static class RestConfiguration { + + private static Log logger = LogFactory.getLog(RestConfiguration.class); + + private List inputs = new ArrayList<>(); + + private Iterator outputs = Arrays.asList("hello", "world").iterator(); + + @GetMapping("/") + ResponseEntity home() { + logger.info("HOME"); + if (this.outputs.hasNext()) { + return ResponseEntity.ok(this.outputs.next()); + } + return ResponseEntity.notFound().build(); + } + + @PostMapping("/") + void accept(@RequestBody String body) { + logger.info("ACCEPT"); + this.inputs.add(body); + } + + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/SourceAutoConfigurationIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/SourceAutoConfigurationIntegrationTests.java index 180a195f5..f6f5fde26 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/SourceAutoConfigurationIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/SourceAutoConfigurationIntegrationTests.java @@ -26,7 +26,6 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.cloud.function.web.RestApplication; import org.springframework.cloud.function.web.source.SourceAutoConfigurationIntegrationTests.ApplicationConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.test.context.ContextConfiguration; @@ -39,9 +38,11 @@ import static org.assertj.core.api.Assertions.assertThat; * */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.NONE, // - properties = "spring.cloud.function.web.supplier.templateUrl=http://localhost:9999/notthere") -@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class }) +// @formatter:off +@SpringBootTest(webEnvironment = WebEnvironment.NONE, + properties = "spring.cloud.function.web.export.sink.url=http://nosuchhost") +// @formatter:on +@ContextConfiguration(classes = { ApplicationConfiguration.class }) public class SourceAutoConfigurationIntegrationTests { @Autowired diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java index fcd53f278..4bd6b36b6 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java @@ -24,6 +24,8 @@ import java.util.function.Supplier; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,6 +39,7 @@ import org.springframework.cloud.function.web.source.WebAppIntegrationTests.Appl import org.springframework.context.annotation.Bean; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.SocketUtils; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; @@ -48,13 +51,13 @@ import static org.assertj.core.api.Assertions.assertThat; * */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, properties = { +@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT, properties = { "spring.main.web-application-type=reactive", - "spring.cloud.function.web.supplier.templateUrl=http://localhost:${local.server.port}/values", + "spring.cloud.function.web.export.sink.url=http://localhost:${server.port}/values", // in a webapp we have to explicitly enable the export - "spring.cloud.function.web.supplier.enabled=true", + "spring.cloud.function.web.export.enabled=true", // manually so we know the webapp is listening when we start - "spring.cloud.function.web.supplier.autoStartup=false" }) + "spring.cloud.function.web.export.autoStartup=false" }) @ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class }) public class WebAppIntegrationTests { @@ -66,6 +69,16 @@ public class WebAppIntegrationTests { @Autowired private ApplicationConfiguration app; + @BeforeClass + public static void init() { + System.setProperty("server.port", "" + SocketUtils.findAvailableTcpPort()); + } + + @AfterClass + public static void close() { + System.clearProperty("server.port"); + } + @Test public void posts() throws Exception { this.forwarder.start();