Modify logic for header copy from input to output

This is primarily related to Cloud Events. Since we delegate to a separate class for post processing, if outpt message is Cloud Event we will not be doing anything to with regard to header copy in SimpleFunctionRegistry and unstead delegate it to CloudEventFunctionInvocationHelper
This commit is contained in:
Oleg Zhurakousky
2020-12-17 12:18:07 +01:00
parent 7ab6cc446b
commit eb95e1dd78
8 changed files with 23 additions and 792 deletions

View File

@@ -178,6 +178,13 @@ public final class CloudEventMessageBuilder<T> {
else if (key.startsWith(CloudEventMessageUtils.KAFKA_ATTR_PREFIX)) {
this.swapPrefix(key, CloudEventMessageUtils.KAFKA_ATTR_PREFIX, attributePrefixToUse);
}
else if (key.equals(CloudEventMessageUtils._ID) || key.equals(CloudEventMessageUtils._SPECVERSION) ||
key.equals(CloudEventMessageUtils._SOURCE) || key.equals(CloudEventMessageUtils._TYPE) ||
key.equals(CloudEventMessageUtils._DATASCHEMA) || key.equals(CloudEventMessageUtils._SCHEMAURL) ||
key.equals(CloudEventMessageUtils._SUBJECT) || key.equals(CloudEventMessageUtils._TIME) ||
key.equals(CloudEventMessageUtils._DATACONTENTTYPE)) {
this.swapPrefix(key, "", attributePrefixToUse);
}
}
}
return doBuild(attributePrefixToUse);
@@ -197,6 +204,13 @@ public final class CloudEventMessageBuilder<T> {
this.headers.put(prefix + CloudEventMessageUtils._ID, UUID.randomUUID().toString());
}
this.headers.put(MessageUtils.MESSAGE_TYPE, CloudEventMessageUtils.CLOUDEVENT_VALUE);
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._TYPE)) {
this.headers.put(prefix + CloudEventMessageUtils._TYPE, this.data.getClass().getName());
}
if (!this.headers.containsKey(prefix + CloudEventMessageUtils._SOURCE)) {
this.headers.put(prefix + CloudEventMessageUtils._SOURCE, URI.create("https://spring.io/" + this.data.getClass().getName()));
}
MessageHeaders headers = new MessageHeaders(this.headers);
GenericMessage<T> message = new GenericMessage<T>(this.data, headers);
Assert.isTrue(CloudEventMessageUtils.isCloudEvent(message), "The message does not appear to be a valid Cloud Event, "

View File

@@ -317,6 +317,10 @@ public final class CloudEventMessageUtils {
&& message.getHeaders().containsKey(TYPE)
&& message.getHeaders().containsKey(SOURCE))
||
(message.getHeaders().containsKey(_SPECVERSION)
&& message.getHeaders().containsKey(_TYPE)
&& message.getHeaders().containsKey(_SOURCE))
||
(message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SPECVERSION)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _TYPE)
&& message.getHeaders().containsKey(AMQP_ATTR_PREFIX + _SOURCE))

View File

@@ -620,12 +620,14 @@ public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspect
private Object enrichInvocationResultIfNecessary(Object input, Object result) {
if (result != null && !(result instanceof Publisher) && input instanceof Message) {
if (result instanceof Message) {
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
result = functionInvocationHelper.postProcessResult(result, (Message) input);
}
else {
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
}
}
else {
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {