diff --git a/pom.xml b/pom.xml index afe78af55..4f6e79717 100644 --- a/pom.xml +++ b/pom.xml @@ -58,10 +58,6 @@ - - org.codehaus.mojo - flatten-maven-plugin - org.apache.maven.plugins maven-checkstyle-plugin diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java index 363434abb..c1702d522 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/AbstractComposableFunctionRegistry.java @@ -30,14 +30,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import org.springframework.cloud.function.context.FunctionRegistration; import org.springframework.cloud.function.context.FunctionRegistry; import org.springframework.cloud.function.context.FunctionType; import org.springframework.cloud.function.context.config.RoutingFunction; -import org.springframework.cloud.function.core.FluxConsumer; -import org.springframework.cloud.function.core.FluxSupplier; import org.springframework.cloud.function.core.FluxToMonoFunction; import org.springframework.cloud.function.core.IsolatedConsumer; import org.springframework.cloud.function.core.IsolatedFunction; @@ -360,27 +357,28 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi } } Object composedFunction = null; - if (a instanceof Supplier && b instanceof Function) { - Supplier> supplier = (Supplier>) a; - if (b instanceof FluxConsumer) { - if (supplier instanceof FluxSupplier) { - FluxConsumer fConsumer = ((FluxConsumer) b); - composedFunction = (Supplier>) () -> Mono.from( - supplier.get().compose(v -> fConsumer.apply(supplier.get()))); - } - else { - throw new IllegalStateException( - "The provided supplier is finite (i.e., already composed with Consumer) " - + "therefore it can not be composed with another consumer"); - } - } - else { - Function function = (Function) b; - composedFunction = (Supplier) () -> function - .apply(supplier.get()); - } - } - else if (a instanceof Function && b instanceof Function) { +// if (a instanceof Supplier && b instanceof Function) { +// Supplier> supplier = (Supplier>) a; +// if (b instanceof FluxConsumer) { +// if (supplier instanceof FluxSupplier) { +// FluxConsumer fConsumer = ((FluxConsumer) b); +// composedFunction = (Supplier>) () -> Mono.from( +// supplier.get().compose(v -> fConsumer.apply(supplier.get()))); +// } +// else { +// throw new IllegalStateException( +// "The provided supplier is finite (i.e., already composed with Consumer) " +// + "therefore it can not be composed with another consumer"); +// } +// } +// else { +// Function function = (Function) b; +// composedFunction = (Supplier) () -> function +// .apply(supplier.get()); +// } +// } +// else + if (a instanceof Function && b instanceof Function) { Function function1 = (Function) a; Function function2 = (Function) b; if (function1 instanceof FluxToMonoFunction) { diff --git a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java index ebfd047c1..1b098a33d 100644 --- a/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java +++ b/spring-cloud-function-context/src/test/java/org/springframework/cloud/function/context/config/RoutingFunctionTests.java @@ -22,6 +22,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -89,7 +90,11 @@ public class RoutingFunctionTests { Message message = MessageBuilder.withPayload("hello") .setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build(); Flux resultFlux = (Flux) function.apply(Flux.just(message)); - Assertions.assertThrows(Exception.class, resultFlux::subscribe); + + StepVerifier + .create(resultFlux) + .expectError() + .verify(); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -101,7 +106,10 @@ public class RoutingFunctionTests { Message message = MessageBuilder.withPayload("hello") .setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build(); Flux resultFlux = (Flux) function.apply(Flux.just(message)); - Assertions.assertThrows(Exception.class, resultFlux::subscribe); + StepVerifier + .create(resultFlux) + .expectError() + .verify(); } @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/spring-cloud-function-web/pom.xml b/spring-cloud-function-web/pom.xml index afd37baf4..c405facd0 100644 --- a/spring-cloud-function-web/pom.xml +++ b/spring-cloud-function-web/pom.xml @@ -29,6 +29,7 @@ io.projectreactor.netty reactor-netty + 0.9.10.RELEASE true diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java index accf40651..04fe8fece 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/source/SupplierExporter.java @@ -16,8 +16,8 @@ package org.springframework.cloud.function.web.source; -import java.net.ConnectException; import java.net.URI; +import java.time.Duration; import java.util.Collections; import java.util.Set; import java.util.function.Supplier; @@ -28,6 +28,7 @@ import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.context.SmartLifecycle; @@ -106,22 +107,24 @@ public class SupplierExporter implements SmartLifecycle { } if (suppliersPresent) { this.subscription = streams - .retry(error -> { - /* - * The ConnectException may happen if a server is not yet available/reachable - * The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses - */ - boolean retry = error instanceof ConnectException || error instanceof ClassCastException - && this.running; - if (!retry) { - this.ok = false; - if (!this.debug) { - logger.info(error); - } - stop(); - } - return retry; - }) + .retryWhen(Retry.backoff(5, Duration.ofSeconds(1))) +// .retry(error -> { +// /* +// * The ConnectException may happen if a server is not yet available/reachable +// * The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses +// */ +// boolean retry = error instanceof ConnectException || error instanceof ClassCastException +// && this.running; +// if (!retry) { +// this.ok = false; +// if (!this.debug) { +// logger.info(error); +// } +// stop(); +// } +// return retry; +// } +// ) .doOnComplete(() -> { stop(); })