From 7ddbbe52cdb8fe9ff751ed37b8bfb82bb899bc85 Mon Sep 17 00:00:00 2001 From: Oleg Zhurakousky Date: Fri, 13 Nov 2020 17:47:41 +0100 Subject: [PATCH] Fix HTTP header propagation logic Related to GH-422 and GH-606 --- .../cloudevent/CloudeventDemoApplication.java | 9 +++++++++ .../cloud/function/web/RequestProcessor.java | 19 ++++++++++++++++++- .../function/web/util/FunctionWebUtils.java | 2 +- 3 files changed, 28 insertions(+), 2 deletions(-) diff --git a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java index b2868a10a..66979034f 100644 --- a/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java +++ b/spring-cloud-function-samples/function-sample-cloudevent/src/main/java/io/spring/cloudevent/CloudeventDemoApplication.java @@ -113,6 +113,15 @@ public class CloudeventDemoApplication { }; } + @Bean + public Function, Map> consumeAndProduceCloudEventAsPojoToPojo() { + return ceMessage -> { + ceMessage.put("version", "10.0"); + ceMessage.put("releaseDate", "01-10-2050"); + return ceMessage; + }; + } + @Bean public CloudEventAttributesProvider cloudEventAttributesProvider() { return new CustomCloudEventAtttributesProvider(); diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java index 74dcd5145..e0c13d582 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/RequestProcessor.java @@ -191,7 +191,24 @@ public class RequestProcessor { .map(message -> message.getPayload()); } else { - builder.headers(HeaderUtils.sanitize(request.headers())); + if (result instanceof Mono) { + result = Mono.from(result) + .map(message -> MessageUtils.unpack(handler, message)) + .doOnNext(value -> { + builder.headers(HeaderUtils.sanitize(request.headers())); + addHeaders(builder, value); + }) + .map(message -> message.getPayload()); + } + else { + result = Flux.from(result) + .map(message -> MessageUtils.unpack(handler, message)) + .doOnNext(value -> { + builder.headers(HeaderUtils.sanitize(request.headers())); + addHeaders(builder, value); + }) + .map(message -> message.getPayload()); + } } if (isOutputSingle(handler) diff --git a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java index f115483ac..ca0c5ca9f 100644 --- a/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java +++ b/spring-cloud-function-web/src/main/java/org/springframework/cloud/function/web/util/FunctionWebUtils.java @@ -115,7 +115,7 @@ public final class FunctionWebUtils { } else if (result instanceof Message) { if (!isMessage) { - result = ((Message) result).getPayload(); +// result = ((Message) result).getPayload(); } else if (((Message) result).getPayload() instanceof byte[]) { String str = new String((byte[]) ((Message) result).getPayload());