Minor simplification in FunctionRSocketMessageHandler
This commit is contained in:
@@ -248,24 +248,17 @@ class FunctionRSocketMessageHandler extends RSocketMessageHandler {
|
||||
? Flux.just((DataBuffer) payload)
|
||||
: Flux.from((Publisher<DataBuffer>) payload);
|
||||
|
||||
if (MimeTypeUtils.APPLICATION_JSON_VALUE.equals(message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString())) {
|
||||
if (message.getHeaders().containsKey(MessageHeaders.CONTENT_TYPE)
|
||||
&& MimeTypeUtils.APPLICATION_JSON_VALUE.equals(message.getHeaders().get(MessageHeaders.CONTENT_TYPE).toString())) {
|
||||
Flux<Object> argument = data.map(buffer -> {
|
||||
byte[] bytePayload = this.decoder.decode(buffer, ResolvableType.forType(byte[].class), null, null);
|
||||
if (JsonMapper.isJsonString(bytePayload)) {
|
||||
// // could be array, map or string
|
||||
Object structure = this.jsonMapper.fromJson(bytePayload, Object.class);
|
||||
if (structure instanceof Map) {
|
||||
if (((Map<String, ?>) structure).containsKey(FunctionRSocketUtils.PAYLOAD)) {
|
||||
return MessageBuilder.withPayload(((Map<String, ?>) structure).remove(FunctionRSocketUtils.PAYLOAD))
|
||||
.copyHeaders((Map<String, ?>) ((Map<String, ?>) structure).get(FunctionRSocketUtils.HEADERS))
|
||||
.build();
|
||||
}
|
||||
else {
|
||||
return MessageBuilder.withPayload(structure).build();
|
||||
}
|
||||
}
|
||||
else {
|
||||
return MessageBuilder.withPayload(structure).build();
|
||||
if (structure instanceof Map && ((Map<String, ?>) structure).containsKey(FunctionRSocketUtils.PAYLOAD)) {
|
||||
return MessageBuilder.withPayload(((Map<String, ?>) structure).remove(FunctionRSocketUtils.PAYLOAD))
|
||||
.copyHeaders((Map<String, ?>) ((Map<String, ?>) structure).get(FunctionRSocketUtils.HEADERS))
|
||||
.build();
|
||||
}
|
||||
}
|
||||
return MessageBuilder.withPayload(bytePayload).copyHeadersIfAbsent(message.getHeaders()).build();
|
||||
|
||||
Reference in New Issue
Block a user