From 408035b94a679f5f86033702267f9db3ce16dbca Mon Sep 17 00:00:00 2001 From: onobc Date: Mon, 28 Feb 2022 19:49:41 -0600 Subject: [PATCH] Ability to extend default function result handling Fixes #757 --- .../adapter/azure/FunctionInvoker.java | 125 +++++++++-- .../azure/CustomFunctionInvokerTests.java | 200 ++++++++++++++++++ .../src/main/java/example/Config.java | 23 +- .../ReactiveEchoCustomResultHandler.java | 63 ++++++ 4 files changed, 380 insertions(+), 31 deletions(-) create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/CustomFunctionInvokerTests.java create mode 100644 spring-cloud-function-samples/function-sample-azure/src/main/java/example/ReactiveEchoCustomResultHandler.java diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/FunctionInvoker.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/FunctionInvoker.java index a400aa797..6a9c76c83 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/FunctionInvoker.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/main/java/org/springframework/cloud/function/adapter/azure/FunctionInvoker.java @@ -1,5 +1,5 @@ /* - * Copyright 2021-2021 the original author or authors. + * Copyright 2021-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,6 +64,7 @@ import org.springframework.util.StringUtils; * @param input type * @param result type * @author Oleg Zhurakousky + * @author Chris Bono * @since 3.2 */ public class FunctionInvoker { @@ -119,37 +120,117 @@ public class FunctionInvoker { return function; } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({"unchecked", "rawtypes"}) public O handleRequest(I input, ExecutionContext executionContext) { String functionDefinition = executionContext.getFunctionName(); FunctionInvocationWrapper function = this.discoverFunction(functionDefinition); Object enhancedInput = enhanceInputIfNecessary(input, executionContext); - Object output = function.apply(enhancedInput); - if (output instanceof Publisher) { - if (FunctionTypeUtils.isMono(function.getOutputType())) { - return (O) this.convertOutputIfNecessary(input, Mono.from((Publisher) output).blockOptional().get()); + Object functionResult = function.apply(enhancedInput); + + if (!(functionResult instanceof Publisher)) { + return postProcessImperativeFunctionResult(input, enhancedInput, functionResult, function, executionContext); + } + return postProcessReactiveFunctionResult(input, enhancedInput, (Publisher) functionResult, function, executionContext); + } + + /** + * Post-processes the result from a non-reactive function invocation before returning it to the Azure + * runtime. The default behavior is to {@link #convertOutputIfNecessary possibly convert} the result. + * + *

Provides a hook for custom function invokers to extend/modify the function results handling. + * + * @param rawInputs the inputs passed in from the Azure runtime + * @param functionInputs the actual inputs used for the function invocation; may be + * {@link #enhanceInputIfNecessary different} from the {@literal rawInputs} + * @param functionResult the result from the function invocation + * @param function the invoked function + * @param executionContext the Azure execution context + * @return the possibly modified function results + */ + protected O postProcessImperativeFunctionResult(I rawInputs, Object functionInputs, Object functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + return (O) this.convertOutputIfNecessary(rawInputs, functionResult); + } + + /** + * Post-processes the result from a reactive function invocation before returning it to the Azure + * runtime. The default behavior is to delegate to {@link #postProcessMonoFunctionResult} or + * {@link #postProcessFluxFunctionResult} based on the result type. + * + *

Provides a hook for custom function invokers to extend/modify the function results handling. + * + * @param rawInputs the inputs passed in from the Azure runtime + * @param functionInputs the actual inputs used for the function invocation; may be + * {@link #enhanceInputIfNecessary different} from the {@literal rawInputs} + * @param functionResult the result from the function invocation + * @param function the invoked function + * @param executionContext the Azure execution context + * @return the possibly modified function results + */ + protected O postProcessReactiveFunctionResult(I rawInputs, Object functionInputs, Publisher functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + if (FunctionTypeUtils.isMono(function.getOutputType())) { + return postProcessMonoFunctionResult(rawInputs, functionInputs, Mono.from(functionResult), function, executionContext); + } + return postProcessFluxFunctionResult(rawInputs, functionInputs, Flux.from(functionResult), function, executionContext); + } + + /** + * Post-processes the {@code Mono} result from a reactive function invocation before returning it to the Azure + * runtime. The default behavior is to {@link Mono#blockOptional()} and {@link #convertOutputIfNecessary possibly convert} the result. + * + *

Provides a hook for custom function invokers to extend/modify the function results handling. + * + * @param rawInputs the inputs passed in from the Azure runtime + * @param functionInputs the actual inputs used for the function invocation; may be + * {@link #enhanceInputIfNecessary different} from the {@literal rawInputs} + * @param functionResult the Mono result from the function invocation + * @param function the invoked function + * @param executionContext the Azure execution context + * @return the possibly modified function results + */ + protected O postProcessMonoFunctionResult(I rawInputs, Object functionInputs, Mono functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + return (O) this.convertOutputIfNecessary(rawInputs, functionResult.blockOptional().get()); + } + + /** + * Post-processes the {@code Flux} result from a reactive function invocation before returning it to the Azure + * runtime. The default behavior is to {@link Flux#toIterable() block} and {@link #convertOutputIfNecessary possibly convert} the results. + * + *

Provides a hook for custom function invokers to extend/modify the function results handling. + * + * @param rawInputs the inputs passed in from the Azure runtime + * @param functionInputs the actual inputs used for the function invocation; may be + * {@link #enhanceInputIfNecessary different} from the {@literal rawInputs} + * @param functionResult the Mono result from the function invocation + * @param function the invoked function + * @param executionContext the Azure execution context + * @return the possibly modified function results + */ + protected O postProcessFluxFunctionResult(I rawInputs, Object functionInputs, Flux functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + List resultList = new ArrayList<>(); + for (Object resultItem : functionResult.toIterable()) { + if (resultItem instanceof Collection) { + resultList.addAll((Collection) resultItem); } else { - List resultList = new ArrayList<>(); - for (Object resultItem : Flux.from((Publisher) output).toIterable()) { - if (resultItem instanceof Collection) { - resultList.addAll((Collection) resultItem); - } - else { - if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType())) - && !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) { - return (O) this.convertOutputIfNecessary(input, resultItem); - } - else { - resultList.add(resultItem); - } - } + if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType())) + && !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) { + return (O) this.convertOutputIfNecessary(rawInputs, resultItem); + } + else { + resultList.add(resultItem); } - return (O) this.convertOutputIfNecessary(input, resultList); } } - return (O) this.convertOutputIfNecessary(input, output); + return (O) this.convertOutputIfNecessary(rawInputs, resultList); } @SuppressWarnings({ "unchecked", "rawtypes" }) diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/CustomFunctionInvokerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/CustomFunctionInvokerTests.java new file mode 100644 index 000000000..3d39543ec --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-azure/src/test/java/org/springframework/cloud/function/adapter/azure/CustomFunctionInvokerTests.java @@ -0,0 +1,200 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.azure; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.microsoft.azure.functions.ExecutionContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.messaging.support.GenericMessage; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.util.Lists.list; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * Unit tests for {@link FunctionInvoker} custom result handling. + * + * @author Chris Bono + */ +class CustomFunctionInvokerTests { + + private FunctionInvoker currentInvoker; + + @AfterEach + void closeCurrentInvoker() { + if (this.currentInvoker != null) { + this.currentInvoker.close(); + } + } + + /** + * Verifies custom result handling and proper post-process callback invocation for an imperative function. + */ + @Test + void customImperativeResultHandling() { + FunctionInvoker invoker = new FunctionInvoker(TestFunctionsConfig.class) { + @Override + protected String postProcessImperativeFunctionResult(String rawInputs, Object functionInputs, + Object functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + return functionResult + "+imperative"; + } + }; + invoker = spyOnAndCloseAfterTest(invoker); + ExecutionContext executionContext = new TestExecutionContext("imperativeUppercase"); + String result = invoker.handleRequest("foo", executionContext); + assertThat(result).isEqualTo("FOO+imperative"); + + // Below here verifies that the expected callback(s) were invoked w/ the expected arguments + + // Only imperative post-process callback should be called + verify(invoker, never()).postProcessReactiveFunctionResult(anyString(), any(), any(Publisher.class), any(), same(executionContext)); + verify(invoker, never()).postProcessMonoFunctionResult(anyString(), any(), any(Mono.class), any(), same(executionContext)); + verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext)); + + // Only sniff-test the payload of the input message (the other fields are problematic to verify and no value doing that here) + ArgumentCaptor functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class); + verify(invoker).postProcessImperativeFunctionResult(eq("foo"), functionInputsCaptor.capture(), eq("FOO"), any(), same(executionContext)); + assertThat(functionInputsCaptor.getValue()).extracting(GenericMessage::getPayload).isEqualTo("foo"); + } + + /** + * Verifies custom result handling and proper post-process callback invocation for a reactive Mono function. + */ + @Test + void customReactiveMonoResultHandling() { + FunctionInvoker invoker = new FunctionInvoker(TestFunctionsConfig.class) { + @Override + protected String postProcessMonoFunctionResult(String rawInputs, Object functionInputs, Mono functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + return functionResult.block().toString() + "+mono"; + } + }; + invoker = spyOnAndCloseAfterTest(invoker); + ExecutionContext executionContext = new TestExecutionContext("reactiveMonoUppercase"); + String result = invoker.handleRequest("foo", executionContext); + assertThat(result).isEqualTo("FOO+mono"); + + // Below here verifies that the expected callback(s) were invoked w/ the expected arguments + + // Only publisher->mono post-process callbacks should be called + verify(invoker, never()).postProcessImperativeFunctionResult(anyString(), any(), any(), any(), same(executionContext)); + verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext)); + + // Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here) + ArgumentCaptor functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class); + ArgumentCaptor functionResultCaptor = ArgumentCaptor.forClass(Mono.class); + verify(invoker).postProcessReactiveFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext)); + verify(invoker).postProcessMonoFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext)); + // NOTE: The captors get called twice as the args are just delegated from publisher->mono callback + assertThat(functionInputsCaptor.getAllValues()).extracting(GenericMessage::getPayload).containsExactly("foo", "foo"); + assertThat(functionResultCaptor.getAllValues()).extracting(Mono::block).containsExactly("FOO", "FOO"); + } + + /** + * Verifies custom result handling and proper post-process callback invocation for a reactive Flux function. + */ + @Test + void customReactiveFluxResultHandling() { + FunctionInvoker, String> invoker = new FunctionInvoker, String>(TestFunctionsConfig.class) { + @Override + protected String postProcessFluxFunctionResult(List rawInputs, Object functionInputs, + Flux functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + return functionResult.map(o -> o.toString() + "+flux").collectList().block().stream().collect(Collectors.joining("/")); + } + }; + invoker = spyOnAndCloseAfterTest(invoker); + ExecutionContext executionContext = new TestExecutionContext("reactiveFluxUppercase"); + List rawInputs = Arrays.asList("foo", "bar"); + String result = invoker.handleRequest(rawInputs, executionContext); + assertThat(result).isEqualTo("FOO+flux/BAR+flux"); + + // Below here verifies that the expected callback(s) were invoked w/ the expected arguments + + // Only publisher->flux post-process callbacks should be called + verify(invoker, never()).postProcessImperativeFunctionResult(anyList(), any(), any(), any(), same(executionContext)); + verify(invoker, never()).postProcessMonoFunctionResult(anyList(), any(), any(Mono.class), any(), same(executionContext)); + + // Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here) + ArgumentCaptor> functionInputsCaptor = ArgumentCaptor.forClass(Flux.class); + ArgumentCaptor functionResultCaptor = ArgumentCaptor.forClass(Flux.class); + verify(invoker).postProcessReactiveFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext)); + verify(invoker).postProcessFluxFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext)); + + // NOTE: The captors get called twice as the args are just delegated from publisher->flux callback + + // The functionInputs for each call is Flux with 2 items - one for 'foo' and one for 'bar' + assertThat(functionInputsCaptor.getAllValues()) + .extracting(Flux::collectList).extracting(Mono::block) + .flatExtracting(fluxAsList -> fluxAsList.stream().collect(Collectors.toList())) + .extracting(GenericMessage::getPayload).containsExactlyInAnyOrder("foo", "bar", "foo", "bar"); + + // The functionResult for each call is a Flux w/ 2 items { "FOO", "BAR" } + assertThat(functionResultCaptor.getAllValues()) + .extracting(Flux::collectList).extracting(Mono::block) + .containsExactlyInAnyOrder(list("FOO", "BAR"), list("FOO", "BAR")); + } + + private FunctionInvoker spyOnAndCloseAfterTest(FunctionInvoker invoker) { + this.currentInvoker = invoker; + return spy(invoker); + } + + @Configuration + @EnableAutoConfiguration + static class TestFunctionsConfig { + + @Bean + public Function imperativeUppercase() { + return (s) -> s.toUpperCase(); + } + + @Bean + public Function, Mono> reactiveMonoUppercase() { + return (m) -> m.map(String::toUpperCase); + } + + @Bean + public Function, Flux> reactiveFluxUppercase() { + return (f) -> f.map(String::toUpperCase); + } + + } +} diff --git a/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java index 8bfd1c78c..b40249436 100644 --- a/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java +++ b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/Config.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,20 @@ package example; import java.util.Map; import java.util.function.Function; +import com.microsoft.azure.functions.ExecutionContext; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.function.json.JsonMapper; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; -import com.microsoft.azure.functions.ExecutionContext; - -import reactor.core.publisher.Mono; - +/** + * @author Oleg Zhurakousky + * @author Chris Bono + */ @SpringBootApplication public class Config { @@ -67,13 +71,14 @@ public class Config { }; } - @Bean public Function, Mono> uppercaseReactive() { - return mono -> mono.map(value -> { - return value.toUpperCase(); - }); + return mono -> mono.map(value -> value.toUpperCase()); } + @Bean + public Function, Flux> echoStream() { + return flux -> flux.map(value -> value.toUpperCase()); + } } diff --git a/spring-cloud-function-samples/function-sample-azure/src/main/java/example/ReactiveEchoCustomResultHandler.java b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/ReactiveEchoCustomResultHandler.java new file mode 100644 index 000000000..8f72a80bc --- /dev/null +++ b/spring-cloud-function-samples/function-sample-azure/src/main/java/example/ReactiveEchoCustomResultHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright 2022-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package example; + +import java.util.List; + +import com.microsoft.azure.functions.ExecutionContext; +import com.microsoft.azure.functions.HttpMethod; +import com.microsoft.azure.functions.HttpRequestMessage; +import com.microsoft.azure.functions.annotation.AuthorizationLevel; +import com.microsoft.azure.functions.annotation.FunctionName; +import com.microsoft.azure.functions.annotation.HttpTrigger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; + +import org.springframework.cloud.function.adapter.azure.FunctionInvoker; +import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper; + +/** + * Sample that shows how to customize the default function result handling by operating on the {@link Flux} returned + * from the {@link Config#echoStream()} echoStream} function. + * + * @author Chris Bono + */ +public class ReactiveEchoCustomResultHandler extends FunctionInvoker, String> { + + private static final Log logger = LogFactory.getLog(ReactiveEchoCustomResultHandler.class); + + @FunctionName("echoStream") + public String execute(@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) + HttpRequestMessage> request, ExecutionContext context + ) { + return handleRequest(request.getBody(), context); + } + + @Override + protected String postProcessFluxFunctionResult(List rawInputs, Object functionInputs, Flux functionResult, + FunctionInvocationWrapper function, ExecutionContext executionContext + ) { + functionResult + .doFirst(() -> executionContext.getLogger().info("BEGIN echo post-processing work ...")) + .mapNotNull((v) -> v.toString().toUpperCase()) + .doFinally((signalType) -> executionContext.getLogger().info("END echo post-processing work")) + .subscribe((v) -> executionContext.getLogger().info(" " + v)); + return "Kicked off job for " + rawInputs; + } + +}