Ensure input headers are propagated the same way in reactive functions as they are in imperative

This commit is contained in:
Oleg Zhurakousky
2020-10-23 16:11:24 +02:00
parent bee53fceb3
commit 388cd6674b

View File

@@ -583,6 +583,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
else {
result = this.invokeFunctionAndEnrichResultIfNecessary(convertedInput);
if (result instanceof Flux) {
result = ((Flux) result).doOnError(ex -> logger.error("Failed to invoke function '"
+ this.functionDefinition + "'", (Throwable) ex));
}
else if (result instanceof Mono) {
result = ((Mono) result).doOnError(ex -> logger.error("Failed to invoke function '"
+ this.functionDefinition + "'", (Throwable) ex));
}
}
return result;
}
@@ -592,7 +600,24 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
@SuppressWarnings("unchecked")
private Object invokeFunctionAndEnrichResultIfNecessary(Object value) {
Object inputValue = value instanceof OriginalMessageHolder ? ((OriginalMessageHolder) value).getKey() : value;
Object inputValue;
if (value instanceof Flux) {
inputValue = ((Flux) value).map(iv -> {
return iv instanceof OriginalMessageHolder
? ((OriginalMessageHolder) iv).getKey()
: iv;
});
}
else if (value instanceof Mono) {
inputValue = ((Mono) value).map(iv -> {
return iv instanceof OriginalMessageHolder
? ((OriginalMessageHolder) iv).getKey()
: iv;
});
}
else {
inputValue = value instanceof OriginalMessageHolder ? ((OriginalMessageHolder) value).getKey() : value;
}
Object result = ((Function) this.target).apply(inputValue);
@@ -660,6 +685,11 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
input = null;
}
if (this.isSkipConversionHeaderSet(input, true)) {
if (!FunctionTypeUtils.isMessage(type)) {
input = this.isFunction()
? new OriginalMessageHolder(((Message) input).getPayload(), (Message<?>) input)
: input;
}
return input;
}
@@ -685,7 +715,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
else if (input instanceof Message) {
convertedInput = this.convertInputMessageIfNecessary((Message) input, type);
if (!FunctionTypeUtils.isMultipleArgumentType(this.inputType)) {
convertedInput = this.isPropagateInputHeaders((Message) input) ? new OriginalMessageHolder(convertedInput, (Message<?>) input) : convertedInput;
convertedInput = this.isFunction() ? new OriginalMessageHolder(convertedInput, (Message<?>) input) : convertedInput;
}
}
else {
@@ -813,13 +843,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
&& !(convertedInput instanceof OriginalMessageHolder);
}
/*
*
*/
private boolean isPropagateInputHeaders(Message message) {
return !this.isTypePublisher(this.inputType) && this.isFunction();
}
/*
*
*/
@@ -946,7 +969,6 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
*/
@SuppressWarnings("unchecked")
private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) {
//Type actualType = type;// != null ? FunctionTypeUtils.getGenericType(type) : type;
return publisher instanceof Mono
? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex))