From 388cd6674b9902f2181b776192ccae1680a52542 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 23 Oct 2020 16:11:24 +0200 Subject: [PATCH] Ensure input headers are propagated the same way in reactive functions as they are in imperative --- .../catalog/SimpleFunctionRegistry.java | 42 ++++++++++++++----- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 9088cb77f..6a473f866 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -583,6 +583,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect } else { result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput); + if (result instanceof Flux) { + result = ((Flux) result).doOnError(ex -> logger.error("Failed to invoke function '" + + this.functionDefinition + "'", (Throwable) ex)); + } + else if (result instanceof Mono) { + result = ((Mono) result).doOnError(ex -> logger.error("Failed to invoke function '" + + this.functionDefinition + "'", (Throwable) ex)); + } } return result; } @@ -592,7 +600,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object invokeFunctionAndEnrichResultIfNecessary(Object value) { - Object inputValue = value instanceof OriginalMessageHolder ? ((OriginalMessageHolder) value).getKey() : value; + Object inputValue; + if (value instanceof Flux) { + inputValue = ((Flux) value).map(iv -> { + return iv instanceof OriginalMessageHolder + ? ((OriginalMessageHolder) iv).getKey() + : iv; + }); + } + else if (value instanceof Mono) { + inputValue = ((Mono) value).map(iv -> { + return iv instanceof OriginalMessageHolder + ? ((OriginalMessageHolder) iv).getKey() + : iv; + }); + } + else { + inputValue = value instanceof OriginalMessageHolder ? ((OriginalMessageHolder) value).getKey() : value; + } Object result = ((Function) this.target).apply(inputValue); @@ -660,6 +685,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect input = null; } if (this.isSkipConversionHeaderSet(input, true)) { + if (!FunctionTypeUtils.isMessage(type)) { + input = this.isFunction() + ? new OriginalMessageHolder(((Message) input).getPayload(), (Message) input) + : input; + } return input; } @@ -685,7 +715,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect else if (input instanceof Message) { convertedInput = this.convertInputMessageIfNecessary((Message) input, type); if (!FunctionTypeUtils.isMultipleArgumentType(this.inputType)) { - convertedInput = this.isPropagateInputHeaders((Message) input) ? new OriginalMessageHolder(convertedInput, (Message) input) : convertedInput; + convertedInput = this.isFunction() ? new OriginalMessageHolder(convertedInput, (Message) input) : convertedInput; } } else { @@ -813,13 +843,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect && !(convertedInput instanceof OriginalMessageHolder); } - /* - * - */ - private boolean isPropagateInputHeaders(Message message) { - return !this.isTypePublisher(this.inputType) && this.isFunction(); - } - /* * */ @@ -946,7 +969,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect */ @SuppressWarnings("unchecked") private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) { - //Type actualType = type;// != null ? FunctionTypeUtils.getGenericType(type) : type; return publisher instanceof Mono ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex))