Cleanup 'handleAndReply' logic in RSocketListenerFunction
This commit is contained in:
@@ -97,16 +97,9 @@ class RSocketListenerFunction implements Function<Message<Flux<Object>>, Publish
|
||||
Flux<?> dataFlux =
|
||||
messageToProcess.getPayload()
|
||||
.map((payload) -> {
|
||||
if (payload instanceof Message) {
|
||||
return MessageBuilder.fromMessage((Message) payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
|
||||
}
|
||||
else {
|
||||
return MessageBuilder.withPayload(payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
|
||||
}
|
||||
// if (!(payload instanceof Message)) {
|
||||
// payload = MessageBuilder.createMessage(payload, messageToProcess.getHeaders());
|
||||
// }
|
||||
// return payload;
|
||||
return payload instanceof Message
|
||||
? (Message) payload
|
||||
: MessageBuilder.withPayload(payload).copyHeadersIfAbsent(messageToProcess.getHeaders()).build();
|
||||
});
|
||||
if (this.targetFunction.getInputType() != null && FunctionTypeUtils.isPublisher(this.targetFunction.getInputType())) {
|
||||
dataFlux = dataFlux.transform((Function) this.targetFunction);
|
||||
|
||||
Reference in New Issue
Block a user