SCST-2065 Fix deserialization error when multi IO signature
Fix deserializatioin error when using complex signatures such as Function<Tuple2<Flux<Message<String>> The issue originated from spring cloud stream https://github.com/spring-cloud/spring-cloud-stream/issues/2065
This commit is contained in:
@@ -59,7 +59,12 @@ class CloudEventsFunctionInvocationHelper implements FunctionInvocationHelper<Me
|
||||
|
||||
@Override
|
||||
public Message<?> preProcessInput(Message<?> input, Object inputConverter) {
|
||||
return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter);
|
||||
try {
|
||||
return CloudEventMessageUtils.toCanonical(input, (MessageConverter) inputConverter);
|
||||
}
|
||||
catch (Exception e) {
|
||||
return input;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -982,7 +982,10 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
*
|
||||
*/
|
||||
private boolean isConversionHintRequired(Type actualType, Class<?> rawType) {
|
||||
return rawType != actualType; // && !FunctionTypeUtils.isMessage(actualType);
|
||||
if (Collection.class.isAssignableFrom(rawType) || Map.class.isAssignableFrom(rawType)) {
|
||||
return true;
|
||||
}
|
||||
return rawType != actualType && !FunctionTypeUtils.isMessage(actualType);
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1005,23 +1008,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
|
||||
Object convertedInput = message.getPayload();
|
||||
|
||||
// if (FunctionTypeUtils.isMessage(type) && FunctionTypeUtils.getRawType(type)) {
|
||||
//
|
||||
// }
|
||||
|
||||
|
||||
|
||||
// type = this.extractActualValueTypeIfNecessary(type);
|
||||
|
||||
Type itemType = this.extractActualValueTypeIfNecessary(type);
|
||||
Class<?> rawType = FunctionTypeUtils.isMessage(type)
|
||||
? FunctionTypeUtils.getRawType(itemType)
|
||||
: FunctionTypeUtils.getRawType(type);
|
||||
// if (!rawType.isAssignableFrom(message.getPayload().getClass())) {
|
||||
convertedInput = this.isConversionHintRequired(type, rawType)
|
||||
? SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType, itemType)
|
||||
: SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType);
|
||||
// }
|
||||
convertedInput = this.isConversionHintRequired(type, rawType)
|
||||
? SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType, itemType)
|
||||
: SimpleFunctionRegistry.this.messageConverter.fromMessage(message, rawType);
|
||||
|
||||
|
||||
if (FunctionTypeUtils.isMessage(type)) {
|
||||
@@ -1113,11 +1106,9 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
else if (FunctionTypeUtils.isFlux(type) && publisher instanceof Mono) {
|
||||
publisher = Flux.from(publisher);
|
||||
}
|
||||
Type actualType = type != null && FunctionTypeUtils.isPublisher(type) ? FunctionTypeUtils.getImmediateGenericType(type, 0) : type;
|
||||
// Type actualType = type != null ? FunctionTypeUtils.getGenericType(type, 0) : type;
|
||||
// if (actualType == null) {
|
||||
// actualType = type;
|
||||
// }
|
||||
Type actualType = type != null && FunctionTypeUtils.isPublisher(type)
|
||||
? FunctionTypeUtils.getImmediateGenericType(type, 0)
|
||||
: type;
|
||||
return publisher instanceof Mono
|
||||
? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType))
|
||||
.doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex))
|
||||
|
||||
Reference in New Issue
Block a user