Fix HTTP header propagation logic

Related to GH-422 and GH-606
This commit is contained in:
Oleg Zhurakousky
2020-11-13 17:47:41 +01:00
parent 311649c312
commit 7ddbbe52cd
3 changed files with 28 additions and 2 deletions

View File

@@ -113,6 +113,15 @@ public class CloudeventDemoApplication {
};
}
@Bean
public Function<Map<String, Object>, Map<String, Object>> consumeAndProduceCloudEventAsPojoToPojo() {
return ceMessage -> {
ceMessage.put("version", "10.0");
ceMessage.put("releaseDate", "01-10-2050");
return ceMessage;
};
}
@Bean
public CloudEventAttributesProvider cloudEventAttributesProvider() {
return new CustomCloudEventAtttributesProvider();

View File

@@ -191,7 +191,24 @@ public class RequestProcessor {
.map(message -> message.getPayload());
}
else {
builder.headers(HeaderUtils.sanitize(request.headers()));
if (result instanceof Mono) {
result = Mono.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> {
builder.headers(HeaderUtils.sanitize(request.headers()));
addHeaders(builder, value);
})
.map(message -> message.getPayload());
}
else {
result = Flux.from(result)
.map(message -> MessageUtils.unpack(handler, message))
.doOnNext(value -> {
builder.headers(HeaderUtils.sanitize(request.headers()));
addHeaders(builder, value);
})
.map(message -> message.getPayload());
}
}
if (isOutputSingle(handler)

View File

@@ -115,7 +115,7 @@ public final class FunctionWebUtils {
}
else if (result instanceof Message) {
if (!isMessage) {
result = ((Message) result).getPayload();
// result = ((Message) result).getPayload();
}
else if (((Message) result).getPayload() instanceof byte[]) {
String str = new String((byte[]) ((Message) result).getPayload());