KafkaNull related improvements

When using KafkaNull on the outbound (StreamBridge for ex),
there is a bug that prevents it from working properly.

See https://github.com/spring-cloud/spring-cloud-stream/issues/2614 for more details.

Resolves #981
This commit is contained in:
Soby Chacko
2023-01-05 16:33:06 -05:00
committed by Oleg Zhurakousky
parent 641aef5af4
commit f5e606dc55

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -79,6 +79,7 @@ import org.springframework.util.StringUtils;
*
* @author Oleg Zhurakousky
* @author Roman Samarev
* @author Soby Chacko
*
*/
public class SimpleFunctionRegistry implements FunctionRegistry {
@@ -957,7 +958,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
.doOnNext((Consumer) this.target).then();
}
else {
((Consumer) this.target).accept(this.extractValueFromOriginalValueHolderIfNecessary(convertedInput));
Object extractedValue = this.extractValueFromOriginalValueHolderIfNecessary(convertedInput);
if (extractedValue instanceof Message &&
((Message) extractedValue).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
((Consumer) this.target).accept(null);
}
else {
((Consumer) this.target).accept(extractedValue);
}
}
return result;
}
@@ -1031,7 +1039,7 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
else if (input instanceof Message) {
input = this.filterOutHeaders((Message) input);
if (((Message) input).getPayload().getClass().getName().equals("org.springframework.kafka.support.KafkaNull")) {
return FunctionTypeUtils.isMessage(type) ? input : null;
return input;
}
if (functionInvocationHelper != null) {
@@ -1134,8 +1142,15 @@ public class SimpleFunctionRegistry implements FunctionRegistry {
convertedOutput = enhancer.apply(convertedOutput);
}
if (this.getTarget() instanceof PassThruFunction) { // scst-2303
Message enrichedMessage = MessageBuilder.fromMessage((Message) convertedOutput)
Message enrichedMessage;
if (convertedOutput instanceof Message) {
enrichedMessage = MessageBuilder.fromMessage((Message) convertedOutput)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType[0]).build();
}
else {
enrichedMessage = MessageBuilder.withPayload(convertedOutput)
.setHeader(MessageHeaders.CONTENT_TYPE, contentType[0]).build();
}
return messageConverter.toMessage(enrichedMessage.getPayload(), enrichedMessage.getHeaders());
}