diff --git a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java index 309163b64..d0b91abb8 100644 --- a/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java +++ b/spring-cloud-function-stream/src/main/java/org/springframework/cloud/function/stream/StreamListeningFunctionInvoker.java @@ -63,6 +63,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty(); + private static final Object UNCONVERTED = new Object(); + public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultEndpoint) { @@ -105,7 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto private Flux> consumer(String name, Flux> flux) { Consumer consumer = functionCatalog.lookupConsumer(name); - consumer.accept(flux.map(message -> convertInput(consumer).apply(message))); + consumer.accept(flux.map(message -> convertInput(consumer).apply(message)) + .filter(transformed -> transformed != UNCONVERTED)); return Flux.empty(); } @@ -200,12 +203,17 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto } private Object convertPayload(Class inputType, Message m) { + Object result; if (inputType.isAssignableFrom(m.getPayload().getClass())) { - return m.getPayload(); + result = m.getPayload(); } else { - return this.converter.fromMessage(m, inputType); + result = this.converter.fromMessage(m, inputType); } + if (result==null) { + result = UNCONVERTED; + } + return result; } interface FluxMessageProcessor {