Introduce special handling for unconverted messages
Prevents odd looking exceptions when null pops into the stream unexpectedly.
This commit is contained in:
@@ -63,6 +63,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
|
||||
|
||||
private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
|
||||
|
||||
private static final Object UNCONVERTED = new Object();
|
||||
|
||||
public StreamListeningFunctionInvoker(FunctionCatalog functionCatalog,
|
||||
FunctionInspector functionInspector,
|
||||
CompositeMessageConverterFactory converterFactory, String defaultEndpoint) {
|
||||
@@ -105,7 +107,8 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
|
||||
|
||||
private Flux<Message<?>> consumer(String name, Flux<Message<?>> flux) {
|
||||
Consumer<Object> consumer = functionCatalog.lookupConsumer(name);
|
||||
consumer.accept(flux.map(message -> convertInput(consumer).apply(message)));
|
||||
consumer.accept(flux.map(message -> convertInput(consumer).apply(message))
|
||||
.filter(transformed -> transformed != UNCONVERTED));
|
||||
return Flux.empty();
|
||||
}
|
||||
|
||||
@@ -200,12 +203,17 @@ public class StreamListeningFunctionInvoker implements SmartInitializingSingleto
|
||||
}
|
||||
|
||||
private Object convertPayload(Class<?> inputType, Message<?> m) {
|
||||
Object result;
|
||||
if (inputType.isAssignableFrom(m.getPayload().getClass())) {
|
||||
return m.getPayload();
|
||||
result = m.getPayload();
|
||||
}
|
||||
else {
|
||||
return this.converter.fromMessage(m, inputType);
|
||||
result = this.converter.fromMessage(m, inputType);
|
||||
}
|
||||
if (result==null) {
|
||||
result = UNCONVERTED;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
interface FluxMessageProcessor {
|
||||
|
||||
Reference in New Issue
Block a user