Ability to extend default function result handling

Fixes #757
This commit is contained in:
onobc
2022-02-28 19:49:41 -06:00
committed by Oleg Zhurakousky
parent a364aaf86a
commit 408035b94a
4 changed files with 380 additions and 31 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2021-2021 the original author or authors.
* Copyright 2021-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -64,6 +64,7 @@ import org.springframework.util.StringUtils;
* @param <I> input type
* @param <O> result type
* @author Oleg Zhurakousky
* @author Chris Bono
* @since 3.2
*/
public class FunctionInvoker<I, O> {
@@ -119,37 +120,117 @@ public class FunctionInvoker<I, O> {
return function;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@SuppressWarnings({"unchecked", "rawtypes"})
public O handleRequest(I input, ExecutionContext executionContext) {
String functionDefinition = executionContext.getFunctionName();
FunctionInvocationWrapper function = this.discoverFunction(functionDefinition);
Object enhancedInput = enhanceInputIfNecessary(input, executionContext);
Object output = function.apply(enhancedInput);
if (output instanceof Publisher) {
if (FunctionTypeUtils.isMono(function.getOutputType())) {
return (O) this.convertOutputIfNecessary(input, Mono.from((Publisher) output).blockOptional().get());
Object functionResult = function.apply(enhancedInput);
if (!(functionResult instanceof Publisher)) {
return postProcessImperativeFunctionResult(input, enhancedInput, functionResult, function, executionContext);
}
return postProcessReactiveFunctionResult(input, enhancedInput, (Publisher<?>) functionResult, function, executionContext);
}
/**
* Post-processes the result from a non-reactive function invocation before returning it to the Azure
* runtime. The default behavior is to {@link #convertOutputIfNecessary possibly convert} the result.
*
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
*
* @param rawInputs the inputs passed in from the Azure runtime
* @param functionInputs the actual inputs used for the function invocation; may be
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
* @param functionResult the result from the function invocation
* @param function the invoked function
* @param executionContext the Azure execution context
* @return the possibly modified function results
*/
protected O postProcessImperativeFunctionResult(I rawInputs, Object functionInputs, Object functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
return (O) this.convertOutputIfNecessary(rawInputs, functionResult);
}
/**
* Post-processes the result from a reactive function invocation before returning it to the Azure
* runtime. The default behavior is to delegate to {@link #postProcessMonoFunctionResult} or
* {@link #postProcessFluxFunctionResult} based on the result type.
*
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
*
* @param rawInputs the inputs passed in from the Azure runtime
* @param functionInputs the actual inputs used for the function invocation; may be
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
* @param functionResult the result from the function invocation
* @param function the invoked function
* @param executionContext the Azure execution context
* @return the possibly modified function results
*/
protected O postProcessReactiveFunctionResult(I rawInputs, Object functionInputs, Publisher<?> functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
if (FunctionTypeUtils.isMono(function.getOutputType())) {
return postProcessMonoFunctionResult(rawInputs, functionInputs, Mono.from(functionResult), function, executionContext);
}
return postProcessFluxFunctionResult(rawInputs, functionInputs, Flux.from(functionResult), function, executionContext);
}
/**
* Post-processes the {@code Mono} result from a reactive function invocation before returning it to the Azure
* runtime. The default behavior is to {@link Mono#blockOptional()} and {@link #convertOutputIfNecessary possibly convert} the result.
*
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
*
* @param rawInputs the inputs passed in from the Azure runtime
* @param functionInputs the actual inputs used for the function invocation; may be
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
* @param functionResult the Mono result from the function invocation
* @param function the invoked function
* @param executionContext the Azure execution context
* @return the possibly modified function results
*/
protected O postProcessMonoFunctionResult(I rawInputs, Object functionInputs, Mono<?> functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
return (O) this.convertOutputIfNecessary(rawInputs, functionResult.blockOptional().get());
}
/**
* Post-processes the {@code Flux} result from a reactive function invocation before returning it to the Azure
* runtime. The default behavior is to {@link Flux#toIterable() block} and {@link #convertOutputIfNecessary possibly convert} the results.
*
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
*
* @param rawInputs the inputs passed in from the Azure runtime
* @param functionInputs the actual inputs used for the function invocation; may be
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
* @param functionResult the Mono result from the function invocation
* @param function the invoked function
* @param executionContext the Azure execution context
* @return the possibly modified function results
*/
protected O postProcessFluxFunctionResult(I rawInputs, Object functionInputs, Flux<?> functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
List resultList = new ArrayList<>();
for (Object resultItem : functionResult.toIterable()) {
if (resultItem instanceof Collection) {
resultList.addAll((Collection) resultItem);
}
else {
List resultList = new ArrayList<>();
for (Object resultItem : Flux.from((Publisher) output).toIterable()) {
if (resultItem instanceof Collection) {
resultList.addAll((Collection) resultItem);
}
else {
if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
return (O) this.convertOutputIfNecessary(input, resultItem);
}
else {
resultList.add(resultItem);
}
}
if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
return (O) this.convertOutputIfNecessary(rawInputs, resultItem);
}
else {
resultList.add(resultItem);
}
return (O) this.convertOutputIfNecessary(input, resultList);
}
}
return (O) this.convertOutputIfNecessary(input, output);
return (O) this.convertOutputIfNecessary(rawInputs, resultList);
}
@SuppressWarnings({ "unchecked", "rawtypes" })

View File

@@ -0,0 +1,200 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.adapter.azure;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.microsoft.azure.functions.ExecutionContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.support.GenericMessage;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.util.Lists.list;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
* Unit tests for {@link FunctionInvoker} custom result handling.
*
* @author Chris Bono
*/
class CustomFunctionInvokerTests {
private FunctionInvoker<?, ?> currentInvoker;
@AfterEach
void closeCurrentInvoker() {
if (this.currentInvoker != null) {
this.currentInvoker.close();
}
}
/**
* Verifies custom result handling and proper post-process callback invocation for an imperative function.
*/
@Test
void customImperativeResultHandling() {
FunctionInvoker<String, String> invoker = new FunctionInvoker<String, String>(TestFunctionsConfig.class) {
@Override
protected String postProcessImperativeFunctionResult(String rawInputs, Object functionInputs,
Object functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext
) {
return functionResult + "+imperative";
}
};
invoker = spyOnAndCloseAfterTest(invoker);
ExecutionContext executionContext = new TestExecutionContext("imperativeUppercase");
String result = invoker.handleRequest("foo", executionContext);
assertThat(result).isEqualTo("FOO+imperative");
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
// Only imperative post-process callback should be called
verify(invoker, never()).postProcessReactiveFunctionResult(anyString(), any(), any(Publisher.class), any(), same(executionContext));
verify(invoker, never()).postProcessMonoFunctionResult(anyString(), any(), any(Mono.class), any(), same(executionContext));
verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext));
// Only sniff-test the payload of the input message (the other fields are problematic to verify and no value doing that here)
ArgumentCaptor<GenericMessage> functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class);
verify(invoker).postProcessImperativeFunctionResult(eq("foo"), functionInputsCaptor.capture(), eq("FOO"), any(), same(executionContext));
assertThat(functionInputsCaptor.getValue()).extracting(GenericMessage::getPayload).isEqualTo("foo");
}
/**
* Verifies custom result handling and proper post-process callback invocation for a reactive Mono function.
*/
@Test
void customReactiveMonoResultHandling() {
FunctionInvoker<String, String> invoker = new FunctionInvoker<String, String>(TestFunctionsConfig.class) {
@Override
protected String postProcessMonoFunctionResult(String rawInputs, Object functionInputs, Mono<?> functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
return functionResult.block().toString() + "+mono";
}
};
invoker = spyOnAndCloseAfterTest(invoker);
ExecutionContext executionContext = new TestExecutionContext("reactiveMonoUppercase");
String result = invoker.handleRequest("foo", executionContext);
assertThat(result).isEqualTo("FOO+mono");
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
// Only publisher->mono post-process callbacks should be called
verify(invoker, never()).postProcessImperativeFunctionResult(anyString(), any(), any(), any(), same(executionContext));
verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext));
// Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here)
ArgumentCaptor<GenericMessage> functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class);
ArgumentCaptor<Mono> functionResultCaptor = ArgumentCaptor.forClass(Mono.class);
verify(invoker).postProcessReactiveFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
verify(invoker).postProcessMonoFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
// NOTE: The captors get called twice as the args are just delegated from publisher->mono callback
assertThat(functionInputsCaptor.getAllValues()).extracting(GenericMessage::getPayload).containsExactly("foo", "foo");
assertThat(functionResultCaptor.getAllValues()).extracting(Mono::block).containsExactly("FOO", "FOO");
}
/**
* Verifies custom result handling and proper post-process callback invocation for a reactive Flux function.
*/
@Test
void customReactiveFluxResultHandling() {
FunctionInvoker<List<String>, String> invoker = new FunctionInvoker<List<String>, String>(TestFunctionsConfig.class) {
@Override
protected String postProcessFluxFunctionResult(List<String> rawInputs, Object functionInputs,
Flux<?> functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext
) {
return functionResult.map(o -> o.toString() + "+flux").collectList().block().stream().collect(Collectors.joining("/"));
}
};
invoker = spyOnAndCloseAfterTest(invoker);
ExecutionContext executionContext = new TestExecutionContext("reactiveFluxUppercase");
List<String> rawInputs = Arrays.asList("foo", "bar");
String result = invoker.handleRequest(rawInputs, executionContext);
assertThat(result).isEqualTo("FOO+flux/BAR+flux");
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
// Only publisher->flux post-process callbacks should be called
verify(invoker, never()).postProcessImperativeFunctionResult(anyList(), any(), any(), any(), same(executionContext));
verify(invoker, never()).postProcessMonoFunctionResult(anyList(), any(), any(Mono.class), any(), same(executionContext));
// Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here)
ArgumentCaptor<Flux<GenericMessage>> functionInputsCaptor = ArgumentCaptor.forClass(Flux.class);
ArgumentCaptor<Flux> functionResultCaptor = ArgumentCaptor.forClass(Flux.class);
verify(invoker).postProcessReactiveFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
verify(invoker).postProcessFluxFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
// NOTE: The captors get called twice as the args are just delegated from publisher->flux callback
// The functionInputs for each call is Flux<GreetingMessage> with 2 items - one for 'foo' and one for 'bar'
assertThat(functionInputsCaptor.getAllValues())
.extracting(Flux::collectList).extracting(Mono::block)
.flatExtracting(fluxAsList -> fluxAsList.stream().collect(Collectors.toList()))
.extracting(GenericMessage::getPayload).containsExactlyInAnyOrder("foo", "bar", "foo", "bar");
// The functionResult for each call is a Flux<String> w/ 2 items { "FOO", "BAR" }
assertThat(functionResultCaptor.getAllValues())
.extracting(Flux::collectList).extracting(Mono::block)
.containsExactlyInAnyOrder(list("FOO", "BAR"), list("FOO", "BAR"));
}
private <I, O> FunctionInvoker<I, O> spyOnAndCloseAfterTest(FunctionInvoker<I, O> invoker) {
this.currentInvoker = invoker;
return spy(invoker);
}
@Configuration
@EnableAutoConfiguration
static class TestFunctionsConfig {
@Bean
public Function<String, String> imperativeUppercase() {
return (s) -> s.toUpperCase();
}
@Bean
public Function<Mono<String>, Mono<String>> reactiveMonoUppercase() {
return (m) -> m.map(String::toUpperCase);
}
@Bean
public Function<Flux<String>, Flux<String>> reactiveFluxUppercase() {
return (f) -> f.map(String::toUpperCase);
}
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,16 +19,20 @@ package example;
import java.util.Map;
import java.util.function.Function;
import com.microsoft.azure.functions.ExecutionContext;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import com.microsoft.azure.functions.ExecutionContext;
import reactor.core.publisher.Mono;
/**
* @author Oleg Zhurakousky
* @author Chris Bono
*/
@SpringBootApplication
public class Config {
@@ -67,13 +71,14 @@ public class Config {
};
}
@Bean
public Function<Mono<String>, Mono<String>> uppercaseReactive() {
return mono -> mono.map(value -> {
return value.toUpperCase();
});
return mono -> mono.map(value -> value.toUpperCase());
}
@Bean
public Function<Flux<String>, Flux<String>> echoStream() {
return flux -> flux.map(value -> value.toUpperCase());
}
}

View File

@@ -0,0 +1,63 @@
/*
* Copyright 2022-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example;
import java.util.List;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.HttpMethod;
import com.microsoft.azure.functions.HttpRequestMessage;
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.HttpTrigger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
/**
* Sample that shows how to customize the default function result handling by operating on the {@link Flux} returned
* from the {@link Config#echoStream()} echoStream} function.
*
* @author Chris Bono
*/
public class ReactiveEchoCustomResultHandler extends FunctionInvoker<List<String>, String> {
private static final Log logger = LogFactory.getLog(ReactiveEchoCustomResultHandler.class);
@FunctionName("echoStream")
public String execute(@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
HttpRequestMessage<List<String>> request, ExecutionContext context
) {
return handleRequest(request.getBody(), context);
}
@Override
protected String postProcessFluxFunctionResult(List<String> rawInputs, Object functionInputs, Flux<?> functionResult,
FunctionInvocationWrapper function, ExecutionContext executionContext
) {
functionResult
.doFirst(() -> executionContext.getLogger().info("BEGIN echo post-processing work ..."))
.mapNotNull((v) -> v.toString().toUpperCase())
.doFinally((signalType) -> executionContext.getLogger().info("END echo post-processing work"))
.subscribe((v) -> executionContext.getLogger().info(" " + v));
return "Kicked off job for " + rawInputs;
}
}