Consolidate isMessage() operation

This commit is contained in:
Oleg Zhurakousky
2020-10-19 14:57:55 +02:00
parent 14918ebf16
commit 10b1f808ba
7 changed files with 25 additions and 16 deletions

View File

@@ -244,7 +244,8 @@ public class RequestProcessor {
+ "`");
}
if (this.inspector.isMessage(function)) {
if (function != null && ((FunctionInvocationWrapper) function).isInputTypeMessage()) {
flux = messages(wrapper, function, flux);
}
Mono<ResponseEntity<?>> responseEntityMono = null;
@@ -324,7 +325,7 @@ public class RequestProcessor {
private Mono<ResponseEntity<?>> stream(FunctionWrapper request, Publisher<?> result) {
BodyBuilder builder = ResponseEntity.ok();
if (this.inspector.isMessage(request.handler())) {
if (((FunctionInvocationWrapper) request.handler()).isInputTypeMessage()) {
result = Flux.from(result)
.doOnNext(value -> addHeaders(builder, (Message<?>) value))
.map(message -> MessageUtils.unpack(request.handler(), message)
@@ -453,7 +454,7 @@ public class RequestProcessor {
private Publisher<?> value(FunctionWrapper wrapper) {
Flux<?> input = Flux.from(wrapper.argument)
.map(body -> this.converter.convert(wrapper.function, body));
if (this.inspector.isMessage(wrapper.function)) {
if (((FunctionInvocationWrapper) (Object) wrapper.function).isInputTypeMessage()) {
input = messages(wrapper, wrapper.function, input);
}
return Mono.from(wrapper.function.apply(input));