From 157949df93eb91486224e2e6433dcc26ac9539cd Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 5 Mar 2021 08:43:19 +0100 Subject: [PATCH] Fix Message sanitizing logic in FunctionRSocketUtils Only sanitize headers that are not serializable --- .../cloud/function/rsocket/FunctionRSocketUtils.java | 5 +---- .../function/rsocket/RSocketListenerFunction.java | 12 +++++++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java index 666b71358..a46e482f4 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/FunctionRSocketUtils.java @@ -127,10 +127,7 @@ final class FunctionRSocketUtils { key.equals(MessageHeaders.CONTENT_TYPE)) { headers.put(key, message.getHeaders().get(key).toString()); } - else if (!key.equals("rsocketFrameType") && - !key.equals("rsocketRequester") && - !key.equals("rsocketResponse") && - !key.equals("dataBufferFactory")) { + else if (!key.equals("rsocketRequester")) { headers.put(key, message.getHeaders().get(key)); } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java index f623feb01..e53806ba8 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/RSocketListenerFunction.java @@ -97,10 +97,16 @@ class RSocketListenerFunction implements Function>, Publish Flux dataFlux = messageToProcess.getPayload() .map((payload) -> { - if (!(payload instanceof Message)) { - payload = MessageBuilder.createMessage(payload, messageToProcess.getHeaders()); + if (payload instanceof Message) { + return MessageBuilder.fromMessage((Message) payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build(); } - return payload; + else { + return MessageBuilder.withPayload(payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build(); + } +// if (!(payload instanceof Message)) { +// payload = MessageBuilder.createMessage(payload, messageToProcess.getHeaders()); +// } +// return payload; }); if (this.targetFunction.getInputType() != null && FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())) { dataFlux = dataFlux.transform((Function) this.targetFunction);