GH-1141 Add support for composing reactive Supplier/Function with imperative Consumer

Resolves #1141
This commit is contained in:
Oleg Zhurakousky
2024-06-06 11:38:19 +02:00
parent faeb0f77ad
commit 6ffae9397a
2 changed files with 101 additions and 3 deletions

View File

@@ -394,7 +394,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
@SuppressWarnings("rawtypes")
public class FunctionInvocationWrapper implements Function<Object, Object>, Consumer<Object>, Supplier<Object>, Runnable {
private final Object target;
private Object target;
private Type inputType;
@@ -658,13 +658,21 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
|| FunctionTypeUtils.isMultipleArgumentType(((FunctionInvocationWrapper) after).outputType)) {
throw new UnsupportedOperationException("Composition of functions with multiple arguments is not supported at the moment");
}
FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after;
//see GH-1141 for this code snippet
if ((this.getTarget() instanceof Supplier || this.getTarget() instanceof Function) && FunctionTypeUtils.isPublisher(this.getOutputType())
&& afterWrapper.getTarget() instanceof Consumer && !FunctionTypeUtils.isPublisher(afterWrapper.getInputType())) {
Consumer wrapper = new ConsumerWrapper((Consumer) afterWrapper.getTarget());
afterWrapper.target = wrapper;
afterWrapper.inputType = this.outputType;
}
//
this.setSkipOutputConversion(true);
((FunctionInvocationWrapper) after).setSkipOutputConversion(true);
Function rawComposedFunction = v -> ((FunctionInvocationWrapper) after).doApply(doApply(v));
FunctionInvocationWrapper afterWrapper = (FunctionInvocationWrapper) after;
Type composedFunctionType;
if (afterWrapper.outputType == null) {
composedFunctionType = (this.inputType == null) ?
@@ -1551,4 +1559,20 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
return t;
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private static class ConsumerWrapper implements Consumer<Flux<Object>> {
private final Consumer targetConsumer;
ConsumerWrapper(Consumer targetConsumer) {
this.targetConsumer = targetConsumer;
}
@Override
public void accept(Flux messageFlux) {
messageFlux.doOnNext(this.targetConsumer).subscribe();
}
}
}

View File

@@ -140,6 +140,37 @@ public class BeanFactoryAwareFunctionRegistryTests {
assertThat(registration.getNames().iterator().next()).isEqualTo("echo1");
}
@Test
public void testCompositionReactiveSupplierWithImplicitConsumer() throws Exception {
FunctionCatalog catalog = this.configureCatalog(CompositionReactiveSupplierWithConsumer.class);
FunctionInvocationWrapper function = catalog.lookup("supplyPrimitive|consume");
function.apply(null);
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo(1);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo(2);
CompositionReactiveSupplierWithConsumer.results.clear();
function = catalog.lookup("supplyMessage|consume");
function.apply(null);
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo(1);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo(2);
CompositionReactiveSupplierWithConsumer.results.clear();
function = catalog.lookup("functionMessage|consume");
function.apply(Flux.fromArray(new Message[] {MessageBuilder.withPayload("ricky").build(), MessageBuilder.withPayload("bubbles").build()}));
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo("RICKY");
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo("BUBBLES");
CompositionReactiveSupplierWithConsumer.results.clear();
function = catalog.lookup("functionPrimitive|consume");
function.apply(Flux.fromArray(new String[] {"ricky", "bubbles"}));
assertThat(CompositionReactiveSupplierWithConsumer.results.size()).isEqualTo(2);
assertThat(CompositionReactiveSupplierWithConsumer.results.get(0)).isEqualTo("RICKY");
assertThat(CompositionReactiveSupplierWithConsumer.results.get(1)).isEqualTo("BUBBLES");
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testMessageWithArrayAsPayload() throws Exception {
@@ -1539,6 +1570,49 @@ public class BeanFactoryAwareFunctionRegistryTests {
}
}
@EnableAutoConfiguration
@Configuration // s-c-f-1141
@SuppressWarnings({"unchecked", "rawtypes"})
public static class CompositionReactiveSupplierWithConsumer {
private static List results = new ArrayList<>();
@Bean
public Function<Flux<String>, Flux<String>> functionPrimitive() {
return flux -> flux.map(v -> v.toUpperCase());
}
@Bean
public Function<Flux<Message<String>>, Flux<Message<String>>> functionMessage() {
return flux -> flux.map(v -> MessageBuilder.withPayload(v.getPayload().toUpperCase()).build());
}
@Bean
public Supplier<Flux<Message<Integer>>> supplyMessage() {
return () -> {
return Flux.fromArray(
new Message[] { MessageBuilder.withPayload(1).build(), MessageBuilder.withPayload(2).build() });
};
}
@Bean
public Supplier<Flux<Integer>> supplyPrimitive() {
return () -> {
return Flux.fromArray(
new Integer[] { 1, 2});
};
}
@Bean
public Consumer consume() {
return v -> {
if (v instanceof Message vMessage) {
v = vMessage.getPayload();
}
results.add(v);
};
}
}
@EnableAutoConfiguration
@Configuration
public static class MessageWithArrayAsPayload {