GH-692 Fix reactive support in Azure FunctionInvoker

Resolves #692
This commit is contained in:
Oleg Zhurakousky
2021-05-25 17:25:06 +02:00
parent 1e6bac470c
commit 7e5eaeeb49
6 changed files with 102 additions and 13 deletions

View File

@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
@@ -119,23 +120,28 @@ public class FunctionInvoker<I, O> {
Object enhancedInput = enhanceInputIfNecessary(input, executionContext);
Object output = function.apply(enhancedInput);
if (output instanceof Publisher && !function.isOutputTypePublisher()) {
List resultList = new ArrayList<>();
for (Object resultItem : Flux.from((Publisher) output).toIterable()) {
if (resultItem instanceof Collection) {
resultList.addAll((Collection) resultItem);
}
else {
if (Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
return (O) this.convertOutputIfNecessary(input, resultItem);
if (output instanceof Publisher) {
if (FunctionTypeUtils.isMono(function.getOutputType())) {
return (O) this.convertOutputIfNecessary(input, Mono.from((Publisher) output).blockOptional().get());
}
else {
List resultList = new ArrayList<>();
for (Object resultItem : Flux.from((Publisher) output).toIterable()) {
if (resultItem instanceof Collection) {
resultList.addAll((Collection) resultItem);
}
else {
resultList.add(resultItem);
if (Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
return (O) this.convertOutputIfNecessary(input, resultItem);
}
else {
resultList.add(resultItem);
}
}
}
return (O) this.convertOutputIfNecessary(input, resultList);
}
return (O) this.convertOutputIfNecessary(input, resultList);
}
return (O) this.convertOutputIfNecessary(input, output);
}

View File

@@ -28,6 +28,7 @@ import com.microsoft.azure.functions.ExecutionContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
@@ -144,6 +145,17 @@ public class FunctionInvokerTests {
assertThat(consumerResult).isEqualTo("foo1");
}
@Test
public void testReactiveFunctions() {
FunctionInvoker<String, String> handler = handler(ReactiveFunctionConfiguration.class);
String result = handler.handleRequest("hello", new TestExecutionContext("uppercaseMono"));
System.out.println(result);
// assertThat(result).isNull();
// assertThat(consumerResult).isEqualTo("foo1");
}
@AfterEach
public void close() throws IOException {
if (this.handler != null) {
@@ -161,6 +173,22 @@ public class FunctionInvokerTests {
}
@Configuration
@EnableAutoConfiguration
protected static class ReactiveFunctionConfiguration {
@Bean
public Function<Flux<String>, Flux<String>> echoStream() {
return f -> f;
}
@Bean
public Function<Mono<String>, Mono<String>> uppercaseMono() {
return f -> f.map(v -> v.toUpperCase());
}
}
@Configuration
protected static class NonFluxSupplierConfig {