GH-56 Added support for Function<Flux, Mono> and <Mono, Flux>

Resolves #56
Resolves #218
This commit is contained in:
Oleg Zhurakousky
2018-10-15 09:34:06 -04:00
parent baabff9a40
commit ae07a13d03
4 changed files with 174 additions and 1 deletions

View File

@@ -52,9 +52,11 @@ import org.springframework.cloud.function.context.catalog.FunctionUnregistration
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxSupplier;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.IsolatedConsumer;
import org.springframework.cloud.function.core.IsolatedFunction;
import org.springframework.cloud.function.core.IsolatedSupplier;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.cloud.function.json.GsonMapper;
import org.springframework.cloud.function.json.JacksonMapper;
import org.springframework.context.ApplicationEventPublisher;
@@ -349,7 +351,22 @@ public class ContextFunctionCatalogAutoConfiguration {
else if (a instanceof Function && b instanceof Function) {
Function<Object, Object> function1 = (Function<Object, Object>) a;
Function<Object, Object> function2 = (Function<Object, Object>) b;
return function1.andThen(function2);
if (function1 instanceof FluxToMonoFunction) {
if (function2 instanceof MonoToFluxFunction) {
return function1.andThen(function2);
}
else {
throw new IllegalStateException("The provided function is finite (i.e., returns Mono<?>) "
+ "therefore it can *only* be composed with compatible function (i.e., Function<Mono, Flux>");
}
}
else if (function2 instanceof FluxToMonoFunction) {
return new FluxToMonoFunction<Object, Object>(((Function<Flux<Object>, Flux<Object>>)a)
.andThen(((FluxToMonoFunction<Object,Object>) b).getTarget()));
}
else {
return function1.andThen(function2);
}
}
else if (a instanceof Function && b instanceof Consumer) {
Function<Object, Object> function = (Function<Object, Object>) a;
@@ -490,6 +507,12 @@ public class ContextFunctionCatalogAutoConfiguration {
}
registration.target(target);
}
if (Mono.class.isAssignableFrom(type.getOutputWrapper())) {
registration.target(new FluxToMonoFunction<>((Function) target));
}
else if (Mono.class.isAssignableFrom(type.getInputWrapper())) {
registration.target(new MonoToFluxFunction<>((Function) target));
}
return registration;
}

View File

@@ -120,6 +120,33 @@ public class BeanFactoryFunctionCatalogTests {
assertThat(foos.apply(Flux.just(2)).blockFirst()).isEqualTo("Hello 4");
}
@Test
public void composeWithFiniteFunction() {
Function<String, String> func1 = x -> x.toUpperCase();
processor.register(new FunctionRegistration<>(func1, "func1"));
processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
Function<Flux<String>, Mono<Long>> foos = processor.lookup(Function.class, "func1,func2");
assertThat(foos.apply(Flux.fromArray(new String[] {"a", "b", "c"})).block()).isEqualTo(3);
}
@Test
public void composeWithFiniteFunctionAndContinueWithCompatible() {
Function<String, String> func1 = x -> x.toUpperCase();
processor.register(new FunctionRegistration<>(func1, "func1"));
processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
processor.register(new FunctionRegistration<>(new MonoThenFluxFunction(), "func3"));
Function<Flux<String>, Flux<Integer>> foos = processor.lookup(Function.class, "func1,func2,func3");
assertThat(foos.apply(Flux.fromArray(new String[] {"a", "b", "c"})).collectList().block().size()).isEqualTo(3);
}
@Test(expected=IllegalStateException.class)
public void composeIncompatibleFunctions() {
Function<String, String> func1 = x -> x.toUpperCase();
processor.register(new FunctionRegistration<>(func1, "func1"));
processor.register(new FunctionRegistration<>(new FluxThenMonoFunction(), "func2"));
processor.lookup(Function.class, "func2,func1");
}
@Test
public void composeSupplier() {
processor.register(new FunctionRegistration<>(new Source(), "numbers"));
@@ -231,4 +258,20 @@ public class BeanFactoryFunctionCatalogTests {
}
protected static class FluxThenMonoFunction implements Function<Flux<String>, Mono<Long>> {
@Override
public Mono<Long> apply(Flux<String> t) {
return t.count();
}
}
protected static class MonoThenFluxFunction implements Function<Mono<Long>, Flux<Integer>> {
@Override
public Flux<Integer> apply(Mono<Long> t) {
return Flux.range(0, Integer.parseInt(t.block().toString()));
}
}
}