diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index ffef18541..a4b8be79e 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -1377,10 +1377,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect ? FunctionTypeUtils.getImmediateGenericType(type, 0) : type; return publisher instanceof Mono - ? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType)) - .doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex)) - : Flux.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType)) - .doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex)); + ? Mono.from(publisher).map(v -> { + try { + return this.convertInputIfNecessary(v, actualType == null ? type : actualType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert input", e); + } + }) + : Flux.from(publisher).map(v -> { + try { + return this.convertInputIfNecessary(v, actualType == null ? type : actualType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert input", e); + } + }); } /* @@ -1389,10 +1401,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect @SuppressWarnings("unchecked") private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) { return publisher instanceof Mono - ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) - .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)) - : Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) - .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)); + ? Mono.from(publisher).map(v -> { + try { + return this.convertOutputIfNecessary(v, type, expectedOutputContentType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert output", e); + } + }) + : Flux.from(publisher).map(v -> { + try { + return this.convertOutputIfNecessary(v, type, expectedOutputContentType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert output", e); + } + }); } }