GH-661 Fix KafkaNull conversion

This commit is contained in:
Oleg Zhurakousky
2021-03-04 16:03:43 +01:00
parent a207d0c421
commit 731aa3fe0c

View File

@@ -842,7 +842,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
}
else if (!FunctionTypeUtils.isMessage(type)) {
if (this.payloadIsSpecialType(((Message<?>) value).getPayload())) {
return ((Message<?>) value).getPayload();
return FunctionTypeUtils.isMessage(type) ? value : null;
}
if (!((Message<?>) convertedValue).getHeaders().containsKey("scf-sink-url")) {
convertedValue = ((Message<?>) convertedValue).getPayload();