KafkaNull check improvements

Related to f5e606dc55

Resolves #983
This commit is contained in:
Soby Chacko
2023-01-06 11:52:35 -05:00
committed by Oleg Zhurakousky
parent 49ec082746
commit 8a5eafa58f

View File

@@ -879,7 +879,13 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
logger.debug("Invoking function: " + this + "with input type: " + this.getInputType());
}
Object result = ((Function) this.target).apply(inputValue);
Object result;
if (inputValue != null && inputValue.getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
result = ((Function) this.target).apply(null);
}
else {
result = ((Function) this.target).apply(inputValue);
}
if (result instanceof Publisher && functionInvocationHelper != null) {
result = this.postProcessFunction((Publisher) result, firstInputMessage);