From d7ef882ca53fa8a1c2c0bce2bee42896c2c8fae5 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Wed, 22 Sep 2021 17:24:37 +0200 Subject: [PATCH] GH-745 Ensure RSocket support is ready to receive non-Message --- .../cloud/function/json/JsonMapper.java | 12 +++++++++ .../rsocket/MessageAwareJsonDecoder.java | 26 ++++++++++++++++--- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java index f364f4d00..b23271bca 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/json/JsonMapper.java @@ -158,4 +158,16 @@ public abstract class JsonMapper { } return isJson; } + + public static boolean isJsonStringRepresentsMap(Object value) { + boolean isJson = false; + if (value instanceof byte[]) { + value = new String((byte[]) value, StandardCharsets.UTF_8); + } + if (value instanceof String) { + String str = ((String) value).trim(); + isJson = isJsonString(value) && str.startsWith("{") && str.endsWith("}"); + } + return isJson; + } } diff --git a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java index 771de71f9..2da6714a5 100644 --- a/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java +++ b/spring-cloud-function-rsocket/src/main/java/org/springframework/cloud/function/rsocket/MessageAwareJsonDecoder.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import org.reactivestreams.Publisher; @@ -70,8 +72,19 @@ class MessageAwareJsonDecoder extends AbstractDecoder { mimeType, hints); if (messageMap.containsKey(FunctionRSocketUtils.PAYLOAD)) { Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType()); - Object payload = this.jsonMapper.fromJson( - messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType); + Object payload; + if (String.class.isAssignableFrom(FunctionTypeUtils.getRawType(targetType.getType()))) { + Object rawPayload = messageMap.get(FunctionRSocketUtils.PAYLOAD); + if (rawPayload instanceof byte[]) { + payload = new String((byte[]) rawPayload, StandardCharsets.UTF_8); + } + else { + payload = rawPayload; + } + } + else { + payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType); + } if (FunctionTypeUtils.isMessage(targetType.getType())) { return MessageBuilder.withPayload(payload).copyHeaders( @@ -93,7 +106,14 @@ class MessageAwareJsonDecoder extends AbstractDecoder { try { byte[] data = toByteArray(dataBuffer.asInputStream()); - return this.jsonMapper.fromJson(data, targetType.getType()); + if (JsonMapper.isJsonStringRepresentsMap(data)) { + return this.jsonMapper.fromJson(data, targetType.getType()); + } + else { + Map messageMap = new HashMap<>(); + messageMap.put(FunctionRSocketUtils.PAYLOAD, data); + return messageMap; + } } catch (IOException ex) { throw new IllegalStateException(ex);