Various polishing to accomodate boot, reactor and other changes

This commit is contained in:
Oleg Zhurakousky
2020-08-04 19:19:36 +02:00
parent 1670563de9
commit ddba54dee6
5 changed files with 53 additions and 47 deletions

View File

@@ -58,10 +58,6 @@
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>

View File

@@ -30,14 +30,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
@@ -360,27 +357,28 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
}
Object composedFunction = null;
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
composedFunction = (Supplier<Mono<Void>>) () -> Mono.from(
supplier.get().compose(v -> fConsumer.apply(supplier.get())));
}
else {
throw new IllegalStateException(
"The provided supplier is finite (i.e., already composed with Consumer) "
+ "therefore it can not be composed with another consumer");
}
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
composedFunction = (Supplier<Object>) () -> function
.apply(supplier.get());
}
}
else if (a instanceof Function && b instanceof Function) {
// if (a instanceof Supplier && b instanceof Function) {
// Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
// if (b instanceof FluxConsumer) {
// if (supplier instanceof FluxSupplier) {
// FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
// composedFunction = (Supplier<Mono<Void>>) () -> Mono.from(
// supplier.get().compose(v -> fConsumer.apply(supplier.get())));
// }
// else {
// throw new IllegalStateException(
// "The provided supplier is finite (i.e., already composed with Consumer) "
// + "therefore it can not be composed with another consumer");
// }
// }
// else {
// Function<Object, Object> function = (Function<Object, Object>) b;
// composedFunction = (Supplier<Object>) () -> function
// .apply(supplier.get());
// }
// }
// else
if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;
Function<Object, Object> function2 = (Function<Object, Object>) b;
if (function1 instanceof FluxToMonoFunction) {

View File

@@ -22,6 +22,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -89,7 +90,11 @@ public class RoutingFunctionTests {
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
Assertions.assertThrows(Exception.class, resultFlux::subscribe);
StepVerifier
.create(resultFlux)
.expectError()
.verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -101,7 +106,10 @@ public class RoutingFunctionTests {
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
Assertions.assertThrows(Exception.class, resultFlux::subscribe);
StepVerifier
.create(resultFlux)
.expectError()
.verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })

View File

@@ -29,6 +29,7 @@
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.10.RELEASE</version>
<optional>true</optional>
</dependency>
<dependency>

View File

@@ -16,8 +16,8 @@
package org.springframework.cloud.function.web.source;
import java.net.ConnectException;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.Set;
import java.util.function.Supplier;
@@ -28,6 +28,7 @@ import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.context.SmartLifecycle;
@@ -106,22 +107,24 @@ public class SupplierExporter implements SmartLifecycle {
}
if (suppliersPresent) {
this.subscription = streams
.retry(error -> {
/*
* The ConnectException may happen if a server is not yet available/reachable
* The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses
*/
boolean retry = error instanceof ConnectException || error instanceof ClassCastException
&& this.running;
if (!retry) {
this.ok = false;
if (!this.debug) {
logger.info(error);
}
stop();
}
return retry;
})
.retryWhen(Retry.backoff(5, Duration.ofSeconds(1)))
// .retry(error -> {
// /*
// * The ConnectException may happen if a server is not yet available/reachable
// * The ClassCast is to handle delayed Mono issued by HttpSupplier.transform for non-2xx responses
// */
// boolean retry = error instanceof ConnectException || error instanceof ClassCastException
// && this.running;
// if (!retry) {
// this.ok = false;
// if (!this.debug) {
// logger.info(error);
// }
// stop();
// }
// return retry;
// }
// )
.doOnComplete(() -> {
stop();
})