SimpleFunctionRegistry: Fixed: compose of supplier...consumer pipeline produces a supplier type. This fix allows testing of composed pipelines without input and output
SimpleFunctionRegistry: added info. No functional changes Resolves #809
This commit is contained in:
committed by
Oleg Zhurakousky
parent
a1eddbaf76
commit
b8856bf0e7
@@ -23,10 +23,12 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.gson.Gson;
|
||||
@@ -527,6 +529,30 @@ public class SimpleFunctionRegistryTests {
|
||||
assertThat(FunctionTypeUtils.isMono(function.getOutputType()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFunctionCompositionWithReactiveSupplierAndConsumer() {
|
||||
SimpleFunctionRegistry catalog = new SimpleFunctionRegistry(this.conversionService, this.messageConverter,
|
||||
new JacksonMapper(new ObjectMapper()));
|
||||
|
||||
Object reactiveFunc = reactiveFluxSupplier();
|
||||
FunctionRegistration functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxSupplier")
|
||||
.type(ResolvableType.forClassWithGenerics(
|
||||
Supplier.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType());
|
||||
catalog.register(functionRegistration);
|
||||
|
||||
reactiveFunc = reactiveFluxConsumer();
|
||||
functionRegistration = new FunctionRegistration(reactiveFunc, "reactiveFluxConsumer")
|
||||
.type(ResolvableType.forClassWithGenerics(
|
||||
Consumer.class, ResolvableType.forClassWithGenerics(Flux.class, String.class)).getType());
|
||||
catalog.register(functionRegistration);
|
||||
|
||||
FunctionInvocationWrapper lookedUpFunction = catalog
|
||||
.lookup("reactiveFluxSupplier|reactiveFluxConsumer");
|
||||
|
||||
assertThat(lookedUpFunction).isNotNull();
|
||||
lookedUpFunction.apply(null);
|
||||
assertThat(consumerDowncounter.get()).isZero();
|
||||
}
|
||||
|
||||
public Function<String, String> uppercase() {
|
||||
return v -> v.toUpperCase();
|
||||
@@ -551,6 +577,18 @@ public class SimpleFunctionRegistryTests {
|
||||
});
|
||||
}
|
||||
|
||||
private final AtomicInteger consumerDowncounter = new AtomicInteger(10);
|
||||
|
||||
public Supplier<Flux<String>> reactiveFluxSupplier() {
|
||||
return () -> Flux.fromStream(
|
||||
IntStream.range(0, consumerDowncounter.get()).boxed().map(i -> Integer.toString(i))
|
||||
);
|
||||
}
|
||||
|
||||
public Consumer<Flux<String>> reactiveFluxConsumer() {
|
||||
return flux -> flux.subscribe(v -> consumerDowncounter.decrementAndGet());
|
||||
}
|
||||
|
||||
private FunctionCatalog configureCatalog(Class<?>... configClass) {
|
||||
ApplicationContext context = new SpringApplicationBuilder(configClass)
|
||||
.run("--logging.level.org.springframework.cloud.function=DEBUG",
|
||||
|
||||
Reference in New Issue
Block a user