Improvements RoutingFunction and MessageFunction

Added error handling to RoutingFunction
Added support for Function<Flux, Flux> to MessageFunction
This commit is contained in:
Oleg Zhurakousky
2019-05-17 15:39:54 +02:00
parent 076aa6f880
commit b8c1003309
2 changed files with 32 additions and 12 deletions

View File

@@ -26,6 +26,7 @@ import reactor.core.publisher.Mono;
import org.springframework.cloud.function.core.FluxConsumer;
import org.springframework.cloud.function.core.FluxFunction;
import org.springframework.cloud.function.core.FluxToMonoFunction;
import org.springframework.cloud.function.core.FluxedFunction;
import org.springframework.cloud.function.core.MonoToFluxFunction;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
@@ -36,7 +37,7 @@ import org.springframework.messaging.support.MessageBuilder;
* @since 2.1
*/
public class MessageFunction
implements Function<Publisher<Message<?>>, Publisher<Message<?>>> {
implements Function<Publisher<?>, Publisher<Message<?>>> {
private final Function<?, ?> delegate;
@@ -44,11 +45,18 @@ public class MessageFunction
this.delegate = delegate;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public Publisher<Message<?>> apply(Publisher<Message<?>> input) {
Flux<Message<?>> flux = Flux.from(input);
public Publisher<Message<?>> apply(Publisher<?> input) {
Flux<Object> incomingFlux = Flux.from(input);
Flux<Message<?>> flux = incomingFlux.map(value -> {
if (!(value instanceof Message)) {
return MessageBuilder.withPayload(value).build();
}
return (Message<?>) value;
});
if (this.delegate instanceof FluxFunction) {
@SuppressWarnings("unchecked")
Function<Object, Object> target = (Function<Object, Object>) ((FluxFunction<?, ?>) this.delegate)
.getTarget();
return flux.map(
@@ -56,7 +64,6 @@ public class MessageFunction
.copyHeaders(value.getHeaders()).build());
}
if (this.delegate instanceof MonoToFluxFunction) {
@SuppressWarnings("unchecked")
Function<Mono<Object>, Flux<Object>> target = ((MonoToFluxFunction<Object, Object>) this.delegate)
.getTarget();
return flux.next()
@@ -65,7 +72,6 @@ public class MessageFunction
.copyHeaders(value.getHeaders()).build()));
}
if (this.delegate instanceof FluxToMonoFunction) {
@SuppressWarnings("unchecked")
Function<Flux<Object>, Mono<Object>> target = ((FluxToMonoFunction<Object, Object>) this.delegate)
.getTarget();
AtomicReference<MessageHeaders> headers = new AtomicReference<>();
@@ -76,7 +82,6 @@ public class MessageFunction
.copyHeaders(headers.get()).build());
}
if (this.delegate instanceof FluxConsumer) {
@SuppressWarnings("unchecked")
FluxConsumer<Object> target = ((FluxConsumer<Object>) this.delegate);
AtomicReference<MessageHeaders> headers = new AtomicReference<>();
Mono<Void> mapped = target.apply(flux.map(messsage -> {
@@ -85,11 +90,17 @@ public class MessageFunction
}));
return mapped.map(value -> MessageBuilder.createMessage(null, headers.get()));
}
// TODO: cover the case that delegate is actually Function<Flux,Flux>
@SuppressWarnings("unchecked")
Function<Object, Object> function = (Function<Object, Object>) this.delegate;
if (this.delegate instanceof FluxedFunction) {
Function<Flux<Object>, Flux<Object>> target = ((FluxedFunction) this.delegate);
return (Flux) flux.map(value -> ((Message) value).getPayload()).transform(target);
}
Function function = this.delegate;
return flux.map(
value -> MessageBuilder.withPayload(function.apply(value.getPayload()))
.copyHeaders(value.getHeaders()).build());
value -> {
return MessageBuilder.withPayload(function.apply(value.getPayload()))
.copyHeaders(value.getHeaders()).build();
});
}
}

View File

@@ -26,6 +26,7 @@ import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.core.WrappedFunction;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
@@ -75,13 +76,21 @@ public class RoutingFunction implements Function<Publisher<Message<?>>, Publishe
return flux.map(message -> {
Object inputValue = this.convertInput(message, function);
return inputValue;
}).transform(function);
})
.log()
.doOnError(error -> {
throw new IllegalStateException("Failed to convert Message. Possible reason; "
+ "No suitable converter was found for payload with 'contentType' "
+ signal.get().getHeaders().get(MessageHeaders.CONTENT_TYPE), error);
})
.transform(function);
});
}
@SuppressWarnings("rawtypes")
private WrappedFunction getRouteToFunction(Message<?> message) {
String routeToFunctionName = (String) message.getHeaders().get("function.name");
Assert.hasText(routeToFunctionName, "A 'function.name' was not provided as message header.");
WrappedFunction function = functionCatalog.lookup(routeToFunctionName);
Assert.notNull(function, "Failed to locate function specified with 'function.name':"
+ message.getHeaders().get("function.name"));