diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java index eb7992f30..6650e07d0 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/MessageFunction.java @@ -26,6 +26,7 @@ import reactor.core.publisher.Mono; import org.springframework.cloud.function.core.FluxConsumer; import org.springframework.cloud.function.core.FluxFunction; import org.springframework.cloud.function.core.FluxToMonoFunction; +import org.springframework.cloud.function.core.FluxedFunction; import org.springframework.cloud.function.core.MonoToFluxFunction; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -36,7 +37,7 @@ import org.springframework.messaging.support.MessageBuilder; * @since 2.1 */ public class MessageFunction - implements Function>, Publisher>> { + implements Function, Publisher>> { private final Function delegate; @@ -44,11 +45,18 @@ public class MessageFunction this.delegate = delegate; } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public Publisher> apply(Publisher> input) { - Flux> flux = Flux.from(input); + public Publisher> apply(Publisher input) { + Flux incomingFlux = Flux.from(input); + Flux> flux = incomingFlux.map(value -> { + if (!(value instanceof Message)) { + return MessageBuilder.withPayload(value).build(); + } + return (Message) value; + }); + if (this.delegate instanceof FluxFunction) { - @SuppressWarnings("unchecked") Function target = (Function) ((FluxFunction) this.delegate) .getTarget(); return flux.map( @@ -56,7 +64,6 @@ public class MessageFunction .copyHeaders(value.getHeaders()).build()); } if (this.delegate instanceof MonoToFluxFunction) { - @SuppressWarnings("unchecked") Function, Flux> target = ((MonoToFluxFunction) this.delegate) .getTarget(); return flux.next() @@ -65,7 +72,6 @@ public class MessageFunction .copyHeaders(value.getHeaders()).build())); } if (this.delegate instanceof FluxToMonoFunction) { - @SuppressWarnings("unchecked") Function, Mono> target = ((FluxToMonoFunction) this.delegate) .getTarget(); AtomicReference headers = new AtomicReference<>(); @@ -76,7 +82,6 @@ public class MessageFunction .copyHeaders(headers.get()).build()); } if (this.delegate instanceof FluxConsumer) { - @SuppressWarnings("unchecked") FluxConsumer target = ((FluxConsumer) this.delegate); AtomicReference headers = new AtomicReference<>(); Mono mapped = target.apply(flux.map(messsage -> { @@ -85,11 +90,17 @@ public class MessageFunction })); return mapped.map(value -> MessageBuilder.createMessage(null, headers.get())); } + // TODO: cover the case that delegate is actually Function - @SuppressWarnings("unchecked") - Function function = (Function) this.delegate; + if (this.delegate instanceof FluxedFunction) { + Function, Flux> target = ((FluxedFunction) this.delegate); + return (Flux) flux.map(value -> ((Message) value).getPayload()).transform(target); + } + Function function = this.delegate; return flux.map( - value -> MessageBuilder.withPayload(function.apply(value.getPayload())) - .copyHeaders(value.getHeaders()).build()); + value -> { + return MessageBuilder.withPayload(function.apply(value.getPayload())) + .copyHeaders(value.getHeaders()).build(); + }); } } diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java index 3bfdc686e..fe1524b32 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/config/RoutingFunction.java @@ -26,6 +26,7 @@ import org.springframework.cloud.function.context.FunctionCatalog; import org.springframework.cloud.function.context.catalog.FunctionInspector; import org.springframework.cloud.function.core.WrappedFunction; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.converter.MessageConverter; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; @@ -75,13 +76,21 @@ public class RoutingFunction implements Function>, Publishe return flux.map(message -> { Object inputValue = this.convertInput(message, function); return inputValue; - }).transform(function); + }) + .log() + .doOnError(error -> { + throw new IllegalStateException("Failed to convert Message. Possible reason; " + + "No suitable converter was found for payload with 'contentType' " + + signal.get().getHeaders().get(MessageHeaders.CONTENT_TYPE), error); + }) + .transform(function); }); } @SuppressWarnings("rawtypes") private WrappedFunction getRouteToFunction(Message message) { String routeToFunctionName = (String) message.getHeaders().get("function.name"); + Assert.hasText(routeToFunctionName, "A 'function.name' was not provided as message header."); WrappedFunction function = functionCatalog.lookup(routeToFunctionName); Assert.notNull(function, "Failed to locate function specified with 'function.name':" + message.getHeaders().get("function.name"));