@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2021-2021 the original author or authors.
|
||||
* Copyright 2021-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -65,6 +65,7 @@ import org.springframework.util.StringUtils;
|
||||
* @param <I> input type
|
||||
* @param <O> result type
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Chris Bono
|
||||
* @since 3.2
|
||||
*/
|
||||
public class FunctionInvoker<I, O> {
|
||||
@@ -120,37 +121,117 @@ public class FunctionInvoker<I, O> {
|
||||
return function;
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
@SuppressWarnings({"unchecked", "rawtypes"})
|
||||
public O handleRequest(I input, ExecutionContext executionContext) {
|
||||
String functionDefinition = executionContext.getFunctionName();
|
||||
FunctionInvocationWrapper function = this.discoverFunction(functionDefinition);
|
||||
Object enhancedInput = enhanceInputIfNecessary(input, executionContext);
|
||||
|
||||
Object output = function.apply(enhancedInput);
|
||||
if (output instanceof Publisher) {
|
||||
if (FunctionTypeUtils.isMono(function.getOutputType())) {
|
||||
return (O) this.convertOutputIfNecessary(input, Mono.from((Publisher) output).blockOptional().get());
|
||||
Object functionResult = function.apply(enhancedInput);
|
||||
|
||||
if (!(functionResult instanceof Publisher)) {
|
||||
return postProcessImperativeFunctionResult(input, enhancedInput, functionResult, function, executionContext);
|
||||
}
|
||||
return postProcessReactiveFunctionResult(input, enhancedInput, (Publisher<?>) functionResult, function, executionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post-processes the result from a non-reactive function invocation before returning it to the Azure
|
||||
* runtime. The default behavior is to {@link #convertOutputIfNecessary possibly convert} the result.
|
||||
*
|
||||
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
|
||||
*
|
||||
* @param rawInputs the inputs passed in from the Azure runtime
|
||||
* @param functionInputs the actual inputs used for the function invocation; may be
|
||||
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
|
||||
* @param functionResult the result from the function invocation
|
||||
* @param function the invoked function
|
||||
* @param executionContext the Azure execution context
|
||||
* @return the possibly modified function results
|
||||
*/
|
||||
protected O postProcessImperativeFunctionResult(I rawInputs, Object functionInputs, Object functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
return (O) this.convertOutputIfNecessary(rawInputs, functionResult);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post-processes the result from a reactive function invocation before returning it to the Azure
|
||||
* runtime. The default behavior is to delegate to {@link #postProcessMonoFunctionResult} or
|
||||
* {@link #postProcessFluxFunctionResult} based on the result type.
|
||||
*
|
||||
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
|
||||
*
|
||||
* @param rawInputs the inputs passed in from the Azure runtime
|
||||
* @param functionInputs the actual inputs used for the function invocation; may be
|
||||
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
|
||||
* @param functionResult the result from the function invocation
|
||||
* @param function the invoked function
|
||||
* @param executionContext the Azure execution context
|
||||
* @return the possibly modified function results
|
||||
*/
|
||||
protected O postProcessReactiveFunctionResult(I rawInputs, Object functionInputs, Publisher<?> functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
if (FunctionTypeUtils.isMono(function.getOutputType())) {
|
||||
return postProcessMonoFunctionResult(rawInputs, functionInputs, Mono.from(functionResult), function, executionContext);
|
||||
}
|
||||
return postProcessFluxFunctionResult(rawInputs, functionInputs, Flux.from(functionResult), function, executionContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Post-processes the {@code Mono} result from a reactive function invocation before returning it to the Azure
|
||||
* runtime. The default behavior is to {@link Mono#blockOptional()} and {@link #convertOutputIfNecessary possibly convert} the result.
|
||||
*
|
||||
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
|
||||
*
|
||||
* @param rawInputs the inputs passed in from the Azure runtime
|
||||
* @param functionInputs the actual inputs used for the function invocation; may be
|
||||
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
|
||||
* @param functionResult the Mono result from the function invocation
|
||||
* @param function the invoked function
|
||||
* @param executionContext the Azure execution context
|
||||
* @return the possibly modified function results
|
||||
*/
|
||||
protected O postProcessMonoFunctionResult(I rawInputs, Object functionInputs, Mono<?> functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
return (O) this.convertOutputIfNecessary(rawInputs, functionResult.blockOptional().get());
|
||||
}
|
||||
|
||||
/**
|
||||
* Post-processes the {@code Flux} result from a reactive function invocation before returning it to the Azure
|
||||
* runtime. The default behavior is to {@link Flux#toIterable() block} and {@link #convertOutputIfNecessary possibly convert} the results.
|
||||
*
|
||||
* <p>Provides a hook for custom function invokers to extend/modify the function results handling.
|
||||
*
|
||||
* @param rawInputs the inputs passed in from the Azure runtime
|
||||
* @param functionInputs the actual inputs used for the function invocation; may be
|
||||
* {@link #enhanceInputIfNecessary different} from the {@literal rawInputs}
|
||||
* @param functionResult the Mono result from the function invocation
|
||||
* @param function the invoked function
|
||||
* @param executionContext the Azure execution context
|
||||
* @return the possibly modified function results
|
||||
*/
|
||||
protected O postProcessFluxFunctionResult(I rawInputs, Object functionInputs, Flux<?> functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
List resultList = new ArrayList<>();
|
||||
for (Object resultItem : functionResult.toIterable()) {
|
||||
if (resultItem instanceof Collection) {
|
||||
resultList.addAll((Collection) resultItem);
|
||||
}
|
||||
else {
|
||||
List resultList = new ArrayList<>();
|
||||
for (Object resultItem : Flux.from((Publisher) output).toIterable()) {
|
||||
if (resultItem instanceof Collection) {
|
||||
resultList.addAll((Collection) resultItem);
|
||||
}
|
||||
else {
|
||||
if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
|
||||
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
|
||||
return (O) this.convertOutputIfNecessary(input, resultItem);
|
||||
}
|
||||
else {
|
||||
resultList.add(resultItem);
|
||||
}
|
||||
}
|
||||
if (!function.isSupplier() && Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getInputType()))
|
||||
&& !Collection.class.isAssignableFrom(FunctionTypeUtils.getRawType(function.getOutputType()))) {
|
||||
return (O) this.convertOutputIfNecessary(rawInputs, resultItem);
|
||||
}
|
||||
else {
|
||||
resultList.add(resultItem);
|
||||
}
|
||||
return (O) this.convertOutputIfNecessary(input, resultList);
|
||||
}
|
||||
}
|
||||
return (O) this.convertOutputIfNecessary(input, output);
|
||||
return (O) this.convertOutputIfNecessary(rawInputs, resultList);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
/*
|
||||
* Copyright 2022-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.function.adapter.azure;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.reactivestreams.Publisher;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.messaging.support.GenericMessage;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.util.Lists.list;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* Unit tests for {@link FunctionInvoker} custom result handling.
|
||||
*
|
||||
* @author Chris Bono
|
||||
*/
|
||||
class CustomFunctionInvokerTests {
|
||||
|
||||
private FunctionInvoker<?, ?> currentInvoker;
|
||||
|
||||
@AfterEach
|
||||
void closeCurrentInvoker() {
|
||||
if (this.currentInvoker != null) {
|
||||
this.currentInvoker.close();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies custom result handling and proper post-process callback invocation for an imperative function.
|
||||
*/
|
||||
@Test
|
||||
void customImperativeResultHandling() {
|
||||
FunctionInvoker<String, String> invoker = new FunctionInvoker<String, String>(TestFunctionsConfig.class) {
|
||||
@Override
|
||||
protected String postProcessImperativeFunctionResult(String rawInputs, Object functionInputs,
|
||||
Object functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
return functionResult + "+imperative";
|
||||
}
|
||||
};
|
||||
invoker = spyOnAndCloseAfterTest(invoker);
|
||||
ExecutionContext executionContext = new TestExecutionContext("imperativeUppercase");
|
||||
String result = invoker.handleRequest("foo", executionContext);
|
||||
assertThat(result).isEqualTo("FOO+imperative");
|
||||
|
||||
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
|
||||
|
||||
// Only imperative post-process callback should be called
|
||||
verify(invoker, never()).postProcessReactiveFunctionResult(anyString(), any(), any(Publisher.class), any(), same(executionContext));
|
||||
verify(invoker, never()).postProcessMonoFunctionResult(anyString(), any(), any(Mono.class), any(), same(executionContext));
|
||||
verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext));
|
||||
|
||||
// Only sniff-test the payload of the input message (the other fields are problematic to verify and no value doing that here)
|
||||
ArgumentCaptor<GenericMessage> functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class);
|
||||
verify(invoker).postProcessImperativeFunctionResult(eq("foo"), functionInputsCaptor.capture(), eq("FOO"), any(), same(executionContext));
|
||||
assertThat(functionInputsCaptor.getValue()).extracting(GenericMessage::getPayload).isEqualTo("foo");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies custom result handling and proper post-process callback invocation for a reactive Mono function.
|
||||
*/
|
||||
@Test
|
||||
void customReactiveMonoResultHandling() {
|
||||
FunctionInvoker<String, String> invoker = new FunctionInvoker<String, String>(TestFunctionsConfig.class) {
|
||||
@Override
|
||||
protected String postProcessMonoFunctionResult(String rawInputs, Object functionInputs, Mono<?> functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
return functionResult.block().toString() + "+mono";
|
||||
}
|
||||
};
|
||||
invoker = spyOnAndCloseAfterTest(invoker);
|
||||
ExecutionContext executionContext = new TestExecutionContext("reactiveMonoUppercase");
|
||||
String result = invoker.handleRequest("foo", executionContext);
|
||||
assertThat(result).isEqualTo("FOO+mono");
|
||||
|
||||
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
|
||||
|
||||
// Only publisher->mono post-process callbacks should be called
|
||||
verify(invoker, never()).postProcessImperativeFunctionResult(anyString(), any(), any(), any(), same(executionContext));
|
||||
verify(invoker, never()).postProcessFluxFunctionResult(anyString(), any(), any(Flux.class), any(), same(executionContext));
|
||||
|
||||
// Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here)
|
||||
ArgumentCaptor<GenericMessage> functionInputsCaptor = ArgumentCaptor.forClass(GenericMessage.class);
|
||||
ArgumentCaptor<Mono> functionResultCaptor = ArgumentCaptor.forClass(Mono.class);
|
||||
verify(invoker).postProcessReactiveFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
|
||||
verify(invoker).postProcessMonoFunctionResult(eq("foo"), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
|
||||
// NOTE: The captors get called twice as the args are just delegated from publisher->mono callback
|
||||
assertThat(functionInputsCaptor.getAllValues()).extracting(GenericMessage::getPayload).containsExactly("foo", "foo");
|
||||
assertThat(functionResultCaptor.getAllValues()).extracting(Mono::block).containsExactly("FOO", "FOO");
|
||||
}
|
||||
|
||||
/**
|
||||
* Verifies custom result handling and proper post-process callback invocation for a reactive Flux function.
|
||||
*/
|
||||
@Test
|
||||
void customReactiveFluxResultHandling() {
|
||||
FunctionInvoker<List<String>, String> invoker = new FunctionInvoker<List<String>, String>(TestFunctionsConfig.class) {
|
||||
@Override
|
||||
protected String postProcessFluxFunctionResult(List<String> rawInputs, Object functionInputs,
|
||||
Flux<?> functionResult, FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
return functionResult.map(o -> o.toString() + "+flux").collectList().block().stream().collect(Collectors.joining("/"));
|
||||
}
|
||||
};
|
||||
invoker = spyOnAndCloseAfterTest(invoker);
|
||||
ExecutionContext executionContext = new TestExecutionContext("reactiveFluxUppercase");
|
||||
List<String> rawInputs = Arrays.asList("foo", "bar");
|
||||
String result = invoker.handleRequest(rawInputs, executionContext);
|
||||
assertThat(result).isEqualTo("FOO+flux/BAR+flux");
|
||||
|
||||
// Below here verifies that the expected callback(s) were invoked w/ the expected arguments
|
||||
|
||||
// Only publisher->flux post-process callbacks should be called
|
||||
verify(invoker, never()).postProcessImperativeFunctionResult(anyList(), any(), any(), any(), same(executionContext));
|
||||
verify(invoker, never()).postProcessMonoFunctionResult(anyList(), any(), any(Mono.class), any(), same(executionContext));
|
||||
|
||||
// Only sniff-test the payload of the input message and the mono (the other fields are problematic to verify and no value doing that here)
|
||||
ArgumentCaptor<Flux<GenericMessage>> functionInputsCaptor = ArgumentCaptor.forClass(Flux.class);
|
||||
ArgumentCaptor<Flux> functionResultCaptor = ArgumentCaptor.forClass(Flux.class);
|
||||
verify(invoker).postProcessReactiveFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
|
||||
verify(invoker).postProcessFluxFunctionResult(same(rawInputs), functionInputsCaptor.capture(), functionResultCaptor.capture(), any(), same(executionContext));
|
||||
|
||||
// NOTE: The captors get called twice as the args are just delegated from publisher->flux callback
|
||||
|
||||
// The functionInputs for each call is Flux<GreetingMessage> with 2 items - one for 'foo' and one for 'bar'
|
||||
assertThat(functionInputsCaptor.getAllValues())
|
||||
.extracting(Flux::collectList).extracting(Mono::block)
|
||||
.flatExtracting(fluxAsList -> fluxAsList.stream().collect(Collectors.toList()))
|
||||
.extracting(GenericMessage::getPayload).containsExactlyInAnyOrder("foo", "bar", "foo", "bar");
|
||||
|
||||
// The functionResult for each call is a Flux<String> w/ 2 items { "FOO", "BAR" }
|
||||
assertThat(functionResultCaptor.getAllValues())
|
||||
.extracting(Flux::collectList).extracting(Mono::block)
|
||||
.containsExactlyInAnyOrder(list("FOO", "BAR"), list("FOO", "BAR"));
|
||||
}
|
||||
|
||||
private <I, O> FunctionInvoker<I, O> spyOnAndCloseAfterTest(FunctionInvoker<I, O> invoker) {
|
||||
this.currentInvoker = invoker;
|
||||
return spy(invoker);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableAutoConfiguration
|
||||
static class TestFunctionsConfig {
|
||||
|
||||
@Bean
|
||||
public Function<String, String> imperativeUppercase() {
|
||||
return (s) -> s.toUpperCase();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Mono<String>, Mono<String>> reactiveMonoUppercase() {
|
||||
return (m) -> m.map(String::toUpperCase);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Flux<String>, Flux<String>> reactiveFluxUppercase() {
|
||||
return (f) -> f.map(String::toUpperCase);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright 2012-2021 the original author or authors.
|
||||
* Copyright 2012-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -19,16 +19,20 @@ package example;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.cloud.function.json.JsonMapper;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* @author Oleg Zhurakousky
|
||||
* @author Chris Bono
|
||||
*/
|
||||
@SpringBootApplication
|
||||
public class Config {
|
||||
|
||||
@@ -67,13 +71,14 @@ public class Config {
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@Bean
|
||||
public Function<Mono<String>, Mono<String>> uppercaseReactive() {
|
||||
return mono -> mono.map(value -> {
|
||||
return value.toUpperCase();
|
||||
});
|
||||
return mono -> mono.map(value -> value.toUpperCase());
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Function<Flux<String>, Flux<String>> echoStream() {
|
||||
return flux -> flux.map(value -> value.toUpperCase());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,63 @@
|
||||
/*
|
||||
* Copyright 2022-2022 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package example;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.microsoft.azure.functions.ExecutionContext;
|
||||
import com.microsoft.azure.functions.HttpMethod;
|
||||
import com.microsoft.azure.functions.HttpRequestMessage;
|
||||
import com.microsoft.azure.functions.annotation.AuthorizationLevel;
|
||||
import com.microsoft.azure.functions.annotation.FunctionName;
|
||||
import com.microsoft.azure.functions.annotation.HttpTrigger;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
|
||||
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry.FunctionInvocationWrapper;
|
||||
|
||||
/**
|
||||
* Sample that shows how to customize the default function result handling by operating on the {@link Flux} returned
|
||||
* from the {@link Config#echoStream()} echoStream} function.
|
||||
*
|
||||
* @author Chris Bono
|
||||
*/
|
||||
public class ReactiveEchoCustomResultHandler extends FunctionInvoker<List<String>, String> {
|
||||
|
||||
private static final Log logger = LogFactory.getLog(ReactiveEchoCustomResultHandler.class);
|
||||
|
||||
@FunctionName("echoStream")
|
||||
public String execute(@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS)
|
||||
HttpRequestMessage<List<String>> request, ExecutionContext context
|
||||
) {
|
||||
return handleRequest(request.getBody(), context);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String postProcessFluxFunctionResult(List<String> rawInputs, Object functionInputs, Flux<?> functionResult,
|
||||
FunctionInvocationWrapper function, ExecutionContext executionContext
|
||||
) {
|
||||
functionResult
|
||||
.doFirst(() -> executionContext.getLogger().info("BEGIN echo post-processing work ..."))
|
||||
.mapNotNull((v) -> v.toString().toUpperCase())
|
||||
.doFinally((signalType) -> executionContext.getLogger().info("END echo post-processing work"))
|
||||
.subscribe((v) -> executionContext.getLogger().info(" " + v));
|
||||
return "Kicked off job for " + rawInputs;
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user