Various polishing to accomodate boot, reactor and other changes

This commit is contained in:
Oleg Zhurakousky
2020-08-04 19:19:36 +02:00
parent 55d20db47b
commit 2f36dbccb3
5 changed files with 53 additions and 47 deletions

View File

@@ -30,14 +30,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.FunctionRegistry;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.config.RoutingFunction;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
@@ -360,27 +357,28 @@ public abstract class AbstractComposableFunctionRegistry implements FunctionRegi
}
}
Object composedFunction = null;
if (a instanceof Supplier && b instanceof Function) {
Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
if (b instanceof FluxConsumer) {
if (supplier instanceof FluxSupplier) {
FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
composedFunction = (Supplier<Mono<Void>>) () -> Mono.from(
supplier.get().compose(v -> fConsumer.apply(supplier.get())));
}
else {
throw new IllegalStateException(
"The provided supplier is finite (i.e., already composed with Consumer) "
+ "therefore it can not be composed with another consumer");
}
}
else {
Function<Object, Object> function = (Function<Object, Object>) b;
composedFunction = (Supplier<Object>) () -> function
.apply(supplier.get());
}
}
else if (a instanceof Function && b instanceof Function) {
// if (a instanceof Supplier && b instanceof Function) {
// Supplier<Flux<Object>> supplier = (Supplier<Flux<Object>>) a;
// if (b instanceof FluxConsumer) {
// if (supplier instanceof FluxSupplier) {
// FluxConsumer<Object> fConsumer = ((FluxConsumer<Object>) b);
// composedFunction = (Supplier<Mono<Void>>) () -> Mono.from(
// supplier.get().compose(v -> fConsumer.apply(supplier.get())));
// }
// else {
// throw new IllegalStateException(
// "The provided supplier is finite (i.e., already composed with Consumer) "
// + "therefore it can not be composed with another consumer");
// }
// }
// else {
// Function<Object, Object> function = (Function<Object, Object>) b;
// composedFunction = (Supplier<Object>) () -> function
// .apply(supplier.get());
// }
// }
// else
if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;
Function<Object, Object> function2 = (Function<Object, Object>) b;
if (function1 instanceof FluxToMonoFunction) {

View File

@@ -22,6 +22,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -89,7 +90,11 @@ public class RoutingFunctionTests {
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".definition", "echoFlux").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
Assertions.assertThrows(Exception.class, resultFlux::subscribe);
StepVerifier
.create(resultFlux)
.expectError()
.verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -101,7 +106,10 @@ public class RoutingFunctionTests {
Message<String> message = MessageBuilder.withPayload("hello")
.setHeader(FunctionProperties.PREFIX + ".routing-expression", "'echoFlux'").build();
Flux resultFlux = (Flux) function.apply(Flux.just(message));
Assertions.assertThrows(Exception.class, resultFlux::subscribe);
StepVerifier
.create(resultFlux)
.expectError()
.verify();
}
@SuppressWarnings({ "unchecked", "rawtypes" })