diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java index 9c1bf0d67..7c21985cd 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/HttpSupplier.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.web.source; +import java.time.Duration; import java.util.function.Supplier; import org.apache.commons.logging.Log; @@ -31,9 +32,11 @@ import org.springframework.web.reactive.function.client.WebClient; /** * A {@link Supplier} that pulls data from an HTTP endpoint. Repeatedly polls the endpoint - * until a non-2xx response is received. + * until a non-2xx response is received, at which point it will repeatedly produced a + * Mono at 1 sec intervals until the next 2xx response. * * @author Dave Syer + * @author Oleg Zhurakousky */ public class HttpSupplier implements Supplier> { @@ -70,9 +73,9 @@ public class HttpSupplier implements Supplier> { HttpStatus status = response.statusCode(); if (!status.is2xxSuccessful()) { if (this.props.isDebug()) { - logger.info("Terminated supplier with status=" + response.statusCode()); + logger.info("Delaying supplier based on status=" + response.statusCode()); } - return Mono.error(TerminateException.INSTANCE); + return Mono.delay(Duration.ofSeconds(1)); } return response.bodyToMono(this.props.getSource().getType()) .map(value -> message(response, value)); @@ -88,10 +91,10 @@ public class HttpSupplier implements Supplier> { .build(); } + @SuppressWarnings("serial") private static class TerminateException extends RuntimeException { - static final TerminateException INSTANCE = new TerminateException(); - + @SuppressWarnings("unused") TerminateException() { super("Planned termination"); } @@ -102,5 +105,4 @@ public class HttpSupplier implements Supplier> { } } - } diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java index 93399782e..1c59350e9 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java @@ -16,6 +16,7 @@ package org.springframework.cloud.function.web.source; +import java.net.ConnectException; import java.net.URI; import java.util.Collections; import java.util.Set; @@ -39,6 +40,7 @@ import org.springframework.web.reactive.function.client.WebClient; * endpoint. * * @author Dave Syer + * @author Oleg Zhurakousky * */ public class SupplierExporter implements SmartLifecycle { @@ -82,9 +84,6 @@ public class SupplierExporter implements SmartLifecycle { if (this.running) { return; } - - this.running = true; - this.ok = true; logger.info("Starting"); Flux streams = Flux.empty(); @@ -99,16 +98,62 @@ public class SupplierExporter implements SmartLifecycle { streams = streams.mergeWith(forward(supplier, name)); } - this.subscription = streams.doOnError(error -> { - this.ok = false; - if (!this.debug) { - logger.info(error); - } - }).doOnTerminate(() -> this.running = false).doOnNext(value -> { - if (this.subscription != null && !this.running) { - this.subscription.dispose(); - } - }).subscribe(); + this.subscription = streams + .retry(error -> { + /* + * The ConnectException may happen if a server is not yet available/reachable + * The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses + */ + boolean retry = error instanceof ConnectException || error instanceof ClassCastException + && this.running; + if (!retry) { + this.ok = false; + if (!this.debug) { + logger.info(error); + } + stop(); + } + return retry; + }) + .doOnComplete(() -> { + stop(); + }) + .subscribe(); + + this.ok = true; + this.running = true; + } + + public boolean isOk() { + return this.ok; + } + + @Override + public void stop() { + logger.info("Stopping"); + this.running = false; + this.subscription.dispose(); + } + + @Override + public boolean isRunning() { + return this.running; + } + + @Override + public int getPhase() { + return 0; + } + + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + @Override + public void stop(Runnable callback) { + stop(); + callback.run(); } private Flux forward(Supplier> supplier, String name) { @@ -144,36 +189,4 @@ public class SupplierExporter implements SmartLifecycle { private URI uri(String destination) { return this.requestBuilder.uri(destination); } - - public boolean isOk() { - return this.ok; - } - - @Override - public void stop() { - logger.info("Stopping"); - this.running = false; - } - - @Override - public boolean isRunning() { - return this.running; - } - - @Override - public int getPhase() { - return 0; - } - - @Override - public boolean isAutoStartup() { - return this.autoStartup; - } - - @Override - public void stop(Runnable callback) { - stop(); - callback.run(); - } - } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java index 8e27baae7..d9a2194fb 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/test/FunctionalExporterTests.java @@ -86,11 +86,10 @@ public class FunctionalExporterTests { @Test public void words() throws Exception { int count = 0; - while (this.forwarder.isRunning() && count++ < 1000) { + while (this.forwarder.isRunning() && count++ < 10) { Thread.sleep(20); } // It completed - assertThat(this.forwarder.isRunning()).isFalse(); assertThat(FunctionalExporterTests.app.inputs).contains("HELLO"); assertThat(this.forwarder.isOk()).isTrue(); } diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java index 4d14b1ecd..d6741e480 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationIntegrationTests.java @@ -38,7 +38,6 @@ import org.springframework.cloud.function.web.source.FunctionAutoConfigurationIn import org.springframework.cloud.function.web.source.FunctionAutoConfigurationIntegrationTests.RestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.ResponseEntity; -import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.SocketUtils; import org.springframework.web.bind.annotation.GetMapping; @@ -53,14 +52,14 @@ import static org.assertj.core.api.Assertions.assertThat; * */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT, properties = { +@SpringBootTest(classes = { RestConfiguration.class, + ApplicationConfiguration.class }, + webEnvironment = WebEnvironment.DEFINED_PORT, properties = { "spring.cloud.function.web.export.sink.url=http://localhost:${server.port}", "spring.cloud.function.web.export.source.url=http://localhost:${server.port}", "spring.cloud.function.web.export.sink.name=origin|uppercase", "spring.cloud.function.web.export.debug=true", "spring.cloud.function.web.export.enabled=true" }) -@ContextConfiguration(classes = { RestConfiguration.class, - ApplicationConfiguration.class }) public class FunctionAutoConfigurationIntegrationTests { @Autowired @@ -82,11 +81,10 @@ public class FunctionAutoConfigurationIntegrationTests { @Test public void copiesMessages() throws Exception { int count = 0; - while (this.forwarder.isRunning() && count++ < 100) { + while (this.forwarder.isRunning() && count++ < 10) { Thread.sleep(20); } // It completed - assertThat(this.forwarder.isRunning()).isFalse(); assertThat(this.forwarder.isOk()).isTrue(); assertThat(this.app.inputs).contains("HELLO"); assertThat(this.app.inputs).contains("WORLD"); diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationWithRetriesIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationWithRetriesIntegrationTests.java new file mode 100644 index 000000000..1e4ce1e66 --- /dev/null +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/FunctionAutoConfigurationWithRetriesIntegrationTests.java @@ -0,0 +1,139 @@ +/* + * Copyright 2019-2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.web.source; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Function; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.cloud.function.web.source.FunctionAutoConfigurationWithRetriesIntegrationTests.ApplicationConfiguration; +import org.springframework.cloud.function.web.source.FunctionAutoConfigurationWithRetriesIntegrationTests.RestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.ResponseEntity; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.SocketUtils; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Oleg Zhurakousky + * + */ +@RunWith(SpringRunner.class) +@SpringBootTest(classes = { RestConfiguration.class, + ApplicationConfiguration.class }, + webEnvironment = WebEnvironment.DEFINED_PORT, properties = { + "spring.cloud.function.web.export.sink.url=http://localhost:${server.port}", + "spring.cloud.function.web.export.source.url=http://localhost:${server.port}", + "spring.cloud.function.web.export.sink.name=origin|uppercase", + "spring.cloud.function.web.export.debug=true", + "spring.cloud.function.web.export.enabled=true" }) +public class FunctionAutoConfigurationWithRetriesIntegrationTests { + + @Autowired + private SupplierExporter forwarder; + + @Autowired + private RestConfiguration app; + + @BeforeClass + public static void init() { + System.setProperty("server.port", "" + SocketUtils.findAvailableTcpPort()); + } + + @AfterClass + public static void close() { + System.clearProperty("server.port"); + } + + @Test + public void copiesMessages() throws Exception { + int count = 0; + while (this.forwarder.isRunning() && count++ < 30) { + Thread.sleep(200); + } + // It completed + assertThat(this.forwarder.isOk()).isTrue(); + assertThat(this.forwarder.isRunning()).isFalse(); + assertThat(this.app.inputs.size()).isEqualTo(4); + assertThat(this.app.inputs).contains("2"); + assertThat(this.app.inputs).contains("4"); + assertThat(this.app.inputs).contains("6"); + assertThat(this.app.inputs).contains("8"); + } + + @EnableAutoConfiguration + @TestConfiguration + public static class ApplicationConfiguration { + + @Bean + public Function uppercase() { + return value -> value.toUpperCase(); + } + + } + + @TestConfiguration + @RestController + public static class RestConfiguration { + + @Autowired + private SupplierExporter forwarder; + + private static Log logger = LogFactory.getLog(RestConfiguration.class); + + private List inputs = new ArrayList<>(); + + private int counter; + + @GetMapping("/") + ResponseEntity home() { + logger.info("HOME"); + if (++counter % 2 == 0 && counter < 10) { + return ResponseEntity.ok(String.valueOf(counter)); + } + if (counter >= 10) { + forwarder.stop(); + } + return ResponseEntity.notFound().build(); + } + + @PostMapping("/") + void accept(@RequestBody String body) { + logger.info("ACCEPT"); + this.inputs.add(body); + } + + } + +} diff --git a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java index 4bd6b36b6..7cf586ba1 100644 --- a/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java +++ b/spring-cloud-function-web/src/test/java/org/springframework/cloud/function/web/source/WebAppIntegrationTests.java @@ -37,7 +37,6 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.cloud.function.web.RestApplication; import org.springframework.cloud.function.web.source.WebAppIntegrationTests.ApplicationConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.util.SocketUtils; import org.springframework.web.bind.annotation.PostMapping; @@ -51,14 +50,14 @@ import static org.assertj.core.api.Assertions.assertThat; * */ @RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT, properties = { +@SpringBootTest(classes = { RestApplication.class, ApplicationConfiguration.class }, + webEnvironment = WebEnvironment.DEFINED_PORT, properties = { "spring.main.web-application-type=reactive", "spring.cloud.function.web.export.sink.url=http://localhost:${server.port}/values", // in a webapp we have to explicitly enable the export "spring.cloud.function.web.export.enabled=true", // manually so we know the webapp is listening when we start "spring.cloud.function.web.export.autoStartup=false" }) -@ContextConfiguration(classes = { RestApplication.class, ApplicationConfiguration.class }) public class WebAppIntegrationTests { private static Log logger = LogFactory.getLog(WebAppIntegrationTests.class);