GH-609 Fix support for missmatched Publishers
Given that s-c-f-web always sends input as Flux, it creates issues for Function<Mono, Mono>, so this fixes it Resolves #609
This commit is contained in:
@@ -1088,6 +1088,12 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object convertInputPublisherIfNecessary(Publisher publisher, Type type) {
|
||||
if (FunctionTypeUtils.isMono(type) && publisher instanceof Flux) {
|
||||
publisher = Mono.from(publisher);
|
||||
}
|
||||
else if (FunctionTypeUtils.isFlux(type) && publisher instanceof Mono) {
|
||||
publisher = Flux.from(publisher);
|
||||
}
|
||||
Type actualType = type != null ? FunctionTypeUtils.getGenericType(type) : type;
|
||||
return publisher instanceof Mono
|
||||
? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType))
|
||||
|
||||
@@ -33,6 +33,7 @@ import java.util.stream.Collectors;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.function.Tuple2;
|
||||
@@ -530,6 +531,17 @@ public class BeanFactoryAwareFunctionRegistryTests {
|
||||
assertThat(((Person) config.consumerInputRef.get()).getName()).isEqualTo("Ricky");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGH_609() {
|
||||
FunctionCatalog catalog = this.configureCatalog(SampleFunctionConfiguration.class);
|
||||
Function<Publisher<String>, Publisher<String>> f = catalog.lookup("monoToMono");
|
||||
Mono<String> result = (Mono<String>) f.apply(Mono.just("hello"));
|
||||
assertThat(result.block()).isEqualTo("hello");
|
||||
|
||||
result = (Mono<String>) f.apply(Flux.just("hello"));
|
||||
assertThat(result.block()).isEqualTo("hello");
|
||||
}
|
||||
|
||||
@EnableAutoConfiguration
|
||||
public static class PojoToMessageFunctionCompositionConfiguration {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user