From bb3c62a3cd09330eca916c5ae1639912d360e777 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Mon, 7 Feb 2022 18:56:18 +0100 Subject: [PATCH] GH-796 Fix error handling for reactive input/ouput conversion Resolves #796 --- .../catalog/SimpleFunctionRegistry.java | 40 +++++++++++++++---- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java index 7684cdf34..8a4fb73f9 100644 --- a/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java +++ b/spring-cloud-function-context/src/main/java/org/springframework/cloud/function/context/catalog/SimpleFunctionRegistry.java @@ -1370,10 +1370,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry { ? FunctionTypeUtils.getImmediateGenericType(type, 0) : type; return publisher instanceof Mono - ? Mono.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType)) - .doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex)) - : Flux.from(publisher).map(v -> this.convertInputIfNecessary(v, actualType == null ? type : actualType)) - .doOnError(ex -> logger.error("Failed to convert input", (Throwable) ex)); + ? Mono.from(publisher).map(v -> { + try { + return this.convertInputIfNecessary(v, actualType == null ? type : actualType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert input", e); + } + }) + : Flux.from(publisher).map(v -> { + try { + return this.convertInputIfNecessary(v, actualType == null ? type : actualType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert input", e); + } + }); } /* @@ -1382,10 +1394,22 @@ public class SimpleFunctionRegistry implements FunctionRegistry { @SuppressWarnings("unchecked") private Object convertOutputPublisherIfNecessary(Publisher publisher, Type type, String[] expectedOutputContentType) { return publisher instanceof Mono - ? Mono.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) - .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)) - : Flux.from(publisher).map(v -> this.convertOutputIfNecessary(v, type, expectedOutputContentType)) - .doOnError(ex -> logger.error("Failed to convert output", (Throwable) ex)); + ? Mono.from(publisher).map(v -> { + try { + return this.convertOutputIfNecessary(v, type, expectedOutputContentType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert output", e); + } + }) + : Flux.from(publisher).map(v -> { + try { + return this.convertOutputIfNecessary(v, type, expectedOutputContentType); + } + catch (Exception e) { + throw new IllegalStateException("Failed to convert output", e); + } + }); } }