Fix Message sanitizing logic in FunctionRSocketUtils

Only sanitize headers that are not serializable
This commit is contained in:
Oleg Zhurakousky
2021-03-05 08:43:19 +01:00
parent fcd427ca29
commit 157949df93
2 changed files with 10 additions and 7 deletions

View File

@@ -127,10 +127,7 @@ final class FunctionRSocketUtils {
key.equals(MessageHeaders.CONTENT_TYPE)) {
headers.put(key, message.getHeaders().get(key).toString());
}
else if (!key.equals("rsocketFrameType") &&
!key.equals("rsocketRequester") &&
!key.equals("rsocketResponse") &&
!key.equals("dataBufferFactory")) {
else if (!key.equals("rsocketRequester")) {
headers.put(key, message.getHeaders().get(key));
}
}

View File

@@ -97,10 +97,16 @@ class RSocketListenerFunction implements Function<Message<Flux<Object>>, Publish
Flux<?> dataFlux =
messageToProcess.getPayload()
.map((payload) -> {
if (!(payload instanceof Message)) {
payload = MessageBuilder.createMessage(payload, messageToProcess.getHeaders());
if (payload instanceof Message) {
return MessageBuilder.fromMessage((Message) payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
}
return payload;
else {
return MessageBuilder.withPayload(payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
}
// if (!(payload instanceof Message)) {
// payload = MessageBuilder.createMessage(payload, messageToProcess.getHeaders());
// }
// return payload;
});
if (this.targetFunction.getInputType() != null && FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())) {
dataFlux = dataFlux.transform((Function) this.targetFunction);