diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index bd9251b85..1453a9102 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2021 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -79,6 +79,7 @@ import org.springframework.util.StringUtils; * * @author Oleg Zhurakousky * @author Roman Samarev + * @author Soby Chacko * */ public class SimpleFunctionRegistry implements FunctionRegistry { @@ -957,7 +958,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry { .doOnNext((Consumer) this.target).then(); } else { - ((Consumer) this.target).accept(this.extractValueFromOriginalValueHolderIfNecessary(convertedInput)); + Object extractedValue = this.extractValueFromOriginalValueHolderIfNecessary(convertedInput); + if (extractedValue instanceof Message && + ((Message) extractedValue).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { + ((Consumer) this.target).accept(null); + } + else { + ((Consumer) this.target).accept(extractedValue); + } } return result; } @@ -1031,7 +1039,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry { else if (input instanceof Message) { input = this.filterOutHeaders((Message) input); if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) { - return FunctionTypeUtils.isMessage(type) ? input : null; + return input; } if (functionInvocationHelper != null) { @@ -1134,8 +1142,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry { convertedOutput = enhancer.apply(convertedOutput); } if (this.getTarget() instanceof PassThruFunction) { // scst-2303 - Message enrichedMessage = MessageBuilder.fromMessage((Message) convertedOutput) + Message enrichedMessage; + if (convertedOutput instanceof Message) { + enrichedMessage = MessageBuilder.fromMessage((Message) convertedOutput) .setHeader(MessageHeaders.CONTENT_TYPE, contentType[0]).build(); + } + else { + enrichedMessage = MessageBuilder.withPayload(convertedOutput) + .setHeader(MessageHeaders.CONTENT_TYPE, contentType[0]).build(); + } return messageConverter.toMessage(enrichedMessage.getPayload(), enrichedMessage.getHeaders()); }