GH-745 Ensure RSocket support is ready to receive non-Message
This commit is contained in:
@@ -158,4 +158,16 @@ public abstract class JsonMapper {
|
||||
}
|
||||
return isJson;
|
||||
}
|
||||
|
||||
public static boolean isJsonStringRepresentsMap(Object value) {
|
||||
boolean isJson = false;
|
||||
if (value instanceof byte[]) {
|
||||
value = new String((byte[]) value, StandardCharsets.UTF_8);
|
||||
}
|
||||
if (value instanceof String) {
|
||||
String str = ((String) value).trim();
|
||||
isJson = isJsonString(value) && str.startsWith("{") && str.endsWith("}");
|
||||
}
|
||||
return isJson;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.OutputStream;
|
||||
import java.lang.reflect.Type;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.reactivestreams.Publisher;
|
||||
@@ -70,8 +72,19 @@ class MessageAwareJsonDecoder extends AbstractDecoder<Object> {
|
||||
mimeType, hints);
|
||||
if (messageMap.containsKey(FunctionRSocketUtils.PAYLOAD)) {
|
||||
Type requestedType = FunctionTypeUtils.getGenericType(targetType.getType());
|
||||
Object payload = this.jsonMapper.fromJson(
|
||||
messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType);
|
||||
Object payload;
|
||||
if (String.class.isAssignableFrom(FunctionTypeUtils.getRawType(targetType.getType()))) {
|
||||
Object rawPayload = messageMap.get(FunctionRSocketUtils.PAYLOAD);
|
||||
if (rawPayload instanceof byte[]) {
|
||||
payload = new String((byte[]) rawPayload, StandardCharsets.UTF_8);
|
||||
}
|
||||
else {
|
||||
payload = rawPayload;
|
||||
}
|
||||
}
|
||||
else {
|
||||
payload = this.jsonMapper.fromJson(messageMap.get(FunctionRSocketUtils.PAYLOAD), requestedType);
|
||||
}
|
||||
|
||||
if (FunctionTypeUtils.isMessage(targetType.getType())) {
|
||||
return MessageBuilder.withPayload(payload).copyHeaders(
|
||||
@@ -93,7 +106,14 @@ class MessageAwareJsonDecoder extends AbstractDecoder<Object> {
|
||||
|
||||
try {
|
||||
byte[] data = toByteArray(dataBuffer.asInputStream());
|
||||
return this.jsonMapper.fromJson(data, targetType.getType());
|
||||
if (JsonMapper.isJsonStringRepresentsMap(data)) {
|
||||
return this.jsonMapper.fromJson(data, targetType.getType());
|
||||
}
|
||||
else {
|
||||
Map<String, Object> messageMap = new HashMap<>();
|
||||
messageMap.put(FunctionRSocketUtils.PAYLOAD, data);
|
||||
return messageMap;
|
||||
}
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IllegalStateException(ex);
|
||||
|
||||
Reference in New Issue
Block a user