GH-796 Fix error handling for reactive input/ouput conversion
Resolves #796
This commit is contained in:
@@ -1377,10 +1377,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
? FunctionTypeUtils.getImmediateGenericType(type, 0)
|
||||
: type;
|
||||
return publisher instanceof Mono
|
||||
? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType))
|
||||
.doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex))
|
||||
: Flux.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType))
|
||||
.doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex));
|
||||
? Mono.from(publisher).map(v -> {
|
||||
try {
|
||||
return this.convertInputIfNecessary(v, actualType == null ? type : actualType);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert input", e);
|
||||
}
|
||||
})
|
||||
: Flux.from(publisher).map(v -> {
|
||||
try {
|
||||
return this.convertInputIfNecessary(v, actualType == null ? type : actualType);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert input", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -1389,10 +1401,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
|
||||
@SuppressWarnings("unchecked")
|
||||
private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) {
|
||||
return publisher instanceof Mono
|
||||
? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
||||
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex))
|
||||
: Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType))
|
||||
.doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex));
|
||||
? Mono.from(publisher).map(v -> {
|
||||
try {
|
||||
return this.convertOutputIfNecessary(v, type, expectedOutputContentType);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert output", e);
|
||||
}
|
||||
})
|
||||
: Flux.from(publisher).map(v -> {
|
||||
try {
|
||||
return this.convertOutputIfNecessary(v, type, expectedOutputContentType);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed to convert output", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user